ambari-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From adorosz...@apache.org
Subject [37/50] [abbrv] ambari git commit: AMBARI-21033 Log Search use POJOs for input configuration (mgergely)
Date Tue, 23 May 2017 09:53:35 GMT
http://git-wip-us.apache.org/repos/asf/ambari/blob/fd4a7a46/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/InputFileBaseDescriptorImpl.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/InputFileBaseDescriptorImpl.java b/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/InputFileBaseDescriptorImpl.java
new file mode 100644
index 0000000..51c7ec8
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/InputFileBaseDescriptorImpl.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ambari.logsearch.config.zookeeper.model.inputconfig.impl;
+
+import org.apache.ambari.logsearch.config.api.model.inputconfig.InputFileBaseDescriptor;
+
+import com.google.gson.annotations.Expose;
+import com.google.gson.annotations.SerializedName;
+
+public class InputFileBaseDescriptorImpl extends InputDescriptorImpl implements InputFileBaseDescriptor {
+  @Expose
+  @SerializedName("checkpoint_interval_ms")
+  private Integer checkpointIntervalMs;
+
+  @Expose
+  @SerializedName("process_file")
+  private Boolean processFile;
+
+  @Expose
+  @SerializedName("copy_file")
+  private Boolean copyFile;
+
+  @Override
+  public Boolean getProcessFile() {
+    return processFile;
+  }
+
+  public void setProcessFile(Boolean processFile) {
+    this.processFile = processFile;
+  }
+
+  @Override
+  public Boolean getCopyFile() {
+    return copyFile;
+  }
+
+  public void setCopyFile(Boolean copyFile) {
+    this.copyFile = copyFile;
+  }
+
+  @Override
+  public Integer getCheckpointIntervalMs() {
+    return checkpointIntervalMs;
+  }
+
+  public void setCheckpointIntervalMs(Integer checkpointIntervalMs) {
+    this.checkpointIntervalMs = checkpointIntervalMs;
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/fd4a7a46/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/InputFileDescriptorImpl.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/InputFileDescriptorImpl.java b/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/InputFileDescriptorImpl.java
new file mode 100644
index 0000000..3bfd161
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/InputFileDescriptorImpl.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ambari.logsearch.config.zookeeper.model.inputconfig.impl;
+
+import org.apache.ambari.logsearch.config.api.model.inputconfig.InputFileDescriptor;
+
+public class InputFileDescriptorImpl extends InputFileBaseDescriptorImpl implements InputFileDescriptor {
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/fd4a7a46/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/InputS3FileDescriptorImpl.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/InputS3FileDescriptorImpl.java b/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/InputS3FileDescriptorImpl.java
new file mode 100644
index 0000000..277a57c
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/InputS3FileDescriptorImpl.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ambari.logsearch.config.zookeeper.model.inputconfig.impl;
+
+import org.apache.ambari.logsearch.config.api.model.inputconfig.InputS3FileDescriptor;
+
+import com.google.gson.annotations.Expose;
+import com.google.gson.annotations.SerializedName;
+
+public class InputS3FileDescriptorImpl extends InputFileBaseDescriptorImpl implements InputS3FileDescriptor {
+  @Expose
+  @SerializedName("s3_access_key")
+  private String s3AccessKey;
+
+  @Expose
+  @SerializedName("s3_secret_key")
+  private String s3SecretKey;
+
+  @Override
+  public String getS3AccessKey() {
+    return s3AccessKey;
+  }
+
+  public void setS3AccessKey(String s3AccessKey) {
+    this.s3AccessKey = s3AccessKey;
+  }
+
+  @Override
+  public String getS3SecretKey() {
+    return s3SecretKey;
+  }
+
+  public void setS3SecretKey(String s3SecretKey) {
+    this.s3SecretKey = s3SecretKey;
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/fd4a7a46/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/MapDateDescriptorImpl.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/MapDateDescriptorImpl.java b/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/MapDateDescriptorImpl.java
new file mode 100644
index 0000000..9daad2b
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/MapDateDescriptorImpl.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ambari.logsearch.config.zookeeper.model.inputconfig.impl;
+
+import org.apache.ambari.logsearch.config.api.model.inputconfig.MapDateDescriptor;
+
+import com.google.gson.annotations.Expose;
+import com.google.gson.annotations.SerializedName;
+
+public class MapDateDescriptorImpl implements MapDateDescriptor {
+  @Override
+  public String getJsonName() {
+    return "map_date";
+  }
+
+  @Expose
+  @SerializedName("source_date_pattern")
+  private String sourceDatePattern;
+
+  @Expose
+  @SerializedName("target_date_pattern")
+  private String targetDatePattern;
+
+  @Override
+  public String getSourceDatePattern() {
+    return sourceDatePattern;
+  }
+
+  public void setSourceDatePattern(String sourceDatePattern) {
+    this.sourceDatePattern = sourceDatePattern;
+  }
+
+  @Override
+  public String getTargetDatePattern() {
+    return targetDatePattern;
+  }
+
+  public void setTargetDatePattern(String targetDatePattern) {
+    this.targetDatePattern = targetDatePattern;
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/fd4a7a46/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/MapFieldCopyDescriptorImpl.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/MapFieldCopyDescriptorImpl.java b/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/MapFieldCopyDescriptorImpl.java
new file mode 100644
index 0000000..4a8d746
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/MapFieldCopyDescriptorImpl.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ambari.logsearch.config.zookeeper.model.inputconfig.impl;
+
+import org.apache.ambari.logsearch.config.api.model.inputconfig.MapFieldCopyDescriptor;
+
+import com.google.gson.annotations.Expose;
+import com.google.gson.annotations.SerializedName;
+
+public class MapFieldCopyDescriptorImpl implements MapFieldCopyDescriptor {
+  @Override
+  public String getJsonName() {
+    return "map_fieldcopy";
+  }
+
+  @Expose
+  @SerializedName("copy_name")
+  private String copyName;
+
+  @Override
+  public String getCopyName() {
+    return copyName;
+  }
+
+  public void setCopyName(String copyName) {
+    this.copyName = copyName;
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/fd4a7a46/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/MapFieldNameDescriptorImpl.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/MapFieldNameDescriptorImpl.java b/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/MapFieldNameDescriptorImpl.java
new file mode 100644
index 0000000..333cb67
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/MapFieldNameDescriptorImpl.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ambari.logsearch.config.zookeeper.model.inputconfig.impl;
+
+import org.apache.ambari.logsearch.config.api.model.inputconfig.MapFieldNameDescriptor;
+
+import com.google.gson.annotations.Expose;
+import com.google.gson.annotations.SerializedName;
+
+public class MapFieldNameDescriptorImpl implements MapFieldNameDescriptor {
+  @Override
+  public String getJsonName() {
+    return "map_fieldname";
+  }
+
+  @Expose
+  @SerializedName("new_fieldname")
+  private String newFieldName;
+
+  @Override
+  public String getNewFieldName() {
+    return newFieldName;
+  }
+
+  public void setNewFieldName(String newFieldName) {
+    this.newFieldName = newFieldName;
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/fd4a7a46/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/MapFieldValueDescriptorImpl.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/MapFieldValueDescriptorImpl.java b/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/MapFieldValueDescriptorImpl.java
new file mode 100644
index 0000000..599e152
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/MapFieldValueDescriptorImpl.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ambari.logsearch.config.zookeeper.model.inputconfig.impl;
+
+import org.apache.ambari.logsearch.config.api.model.inputconfig.MapFieldValueDescriptor;
+
+import com.google.gson.annotations.Expose;
+import com.google.gson.annotations.SerializedName;
+
+public class MapFieldValueDescriptorImpl implements MapFieldValueDescriptor {
+  @Override
+  public String getJsonName() {
+    return "map_fieldvalue";
+  }
+
+  @Expose
+  @SerializedName("pre_value")
+  private String preValue;
+
+  @Expose
+  @SerializedName("post_value")
+  private String postValue;
+
+  @Override
+  public String getPreValue() {
+    return preValue;
+  }
+
+  public void setPreValue(String preValue) {
+    this.preValue = preValue;
+  }
+
+  @Override
+  public String getPostValue() {
+    return postValue;
+  }
+
+  public void setPostValue(String postValue) {
+    this.postValue = postValue;
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/fd4a7a46/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/PostMapValuesAdapter.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/PostMapValuesAdapter.java b/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/PostMapValuesAdapter.java
new file mode 100644
index 0000000..32aded8
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/PostMapValuesAdapter.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ambari.logsearch.config.zookeeper.model.inputconfig.impl;
+
+import java.lang.reflect.Type;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.ambari.logsearch.config.api.model.inputconfig.MapFieldDescriptor;
+
+import com.google.gson.JsonArray;
+import com.google.gson.JsonDeserializationContext;
+import com.google.gson.JsonDeserializer;
+import com.google.gson.JsonSerializer;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonSerializationContext;
+
+public class PostMapValuesAdapter implements JsonDeserializer<List<PostMapValuesImpl>>, JsonSerializer<List<PostMapValuesImpl>> {
+  @Override
+  public List<PostMapValuesImpl> deserialize(JsonElement json, Type typeOfT, JsonDeserializationContext context) {
+    List<PostMapValuesImpl> vals = new ArrayList<>();
+    if (json.isJsonArray()) {
+      for (JsonElement e : json.getAsJsonArray()) {
+        vals.add(createPostMapValues(e, context));
+      }
+    } else if (json.isJsonObject()) {
+      vals.add(createPostMapValues(json, context));
+    } else {
+      throw new RuntimeException("Unexpected JSON type: " + json.getClass());
+    }
+    return vals;
+  }
+
+  private PostMapValuesImpl createPostMapValues(JsonElement e, JsonDeserializationContext context) {
+    List<MapFieldDescriptor> mappers = new ArrayList<>();
+    for (Map.Entry<String, JsonElement> m : e.getAsJsonObject().entrySet()) {
+      switch (m.getKey()) {
+        case "map_date":
+          mappers.add((MapDateDescriptorImpl)context.deserialize(m.getValue(), MapDateDescriptorImpl.class));
+          break;
+        case "map_fieldcopy":
+          mappers.add((MapFieldCopyDescriptorImpl)context.deserialize(m.getValue(), MapFieldCopyDescriptorImpl.class));
+          break;
+        case "map_fieldname":
+          mappers.add((MapFieldNameDescriptorImpl)context.deserialize(m.getValue(), MapFieldNameDescriptorImpl.class));
+          break;
+        case "map_fieldvalue":
+          mappers.add((MapFieldValueDescriptorImpl)context.deserialize(m.getValue(), MapFieldValueDescriptorImpl.class));
+          break;
+        default:
+          System.out.println("Unknown key: " + m.getKey());
+      }
+    }
+    
+    PostMapValuesImpl postMapValues = new PostMapValuesImpl();
+    postMapValues.setMappers(mappers);
+    return postMapValues;
+  }
+
+  @Override
+  public JsonElement serialize(List<PostMapValuesImpl> src, Type typeOfSrc, JsonSerializationContext context) {
+    if (src.size() == 1) {
+      return createMapperObject(src.get(0), context);
+    } else {
+      JsonArray jsonArray = new JsonArray();
+      for (PostMapValuesImpl postMapValues : src) {
+        jsonArray.add(createMapperObject(postMapValues, context));
+      }
+      return jsonArray;
+    }
+  }
+
+  private JsonElement createMapperObject(PostMapValuesImpl postMapValues, JsonSerializationContext context) {
+    JsonObject jsonObject = new JsonObject();
+    for (MapFieldDescriptor m : postMapValues.getMappers()) {
+      jsonObject.add(((MapFieldDescriptor)m).getJsonName(), context.serialize(m));
+    }
+    return jsonObject;
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/fd4a7a46/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/PostMapValuesImpl.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/PostMapValuesImpl.java b/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/PostMapValuesImpl.java
new file mode 100644
index 0000000..4d2254a
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/PostMapValuesImpl.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ambari.logsearch.config.zookeeper.model.inputconfig.impl;
+
+import java.util.List;
+
+import org.apache.ambari.logsearch.config.api.model.inputconfig.MapFieldDescriptor;
+import org.apache.ambari.logsearch.config.api.model.inputconfig.PostMapValues;
+
+import com.google.gson.annotations.Expose;
+
+public class PostMapValuesImpl implements PostMapValues {
+  @Expose
+  private List<MapFieldDescriptor> mappers;
+
+  public List<MapFieldDescriptor> getMappers() {
+    return mappers;
+  }
+
+  public void setMappers(List<MapFieldDescriptor> mappers) {
+    this.mappers = mappers;
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/fd4a7a46/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeeder.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeeder.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeeder.java
index c853f42..8d7c69f 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeeder.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeeder.java
@@ -34,7 +34,7 @@ import org.apache.ambari.logfeeder.metrics.MetricData;
 import org.apache.ambari.logfeeder.metrics.MetricsManager;
 import org.apache.ambari.logfeeder.util.LogFeederUtil;
 import org.apache.ambari.logfeeder.util.SSLUtil;
-import org.apache.curator.shaded.com.google.common.collect.Maps;
+import com.google.common.collect.Maps;
 import org.apache.hadoop.util.ShutdownHookManager;
 import org.apache.log4j.Logger;
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/fd4a7a46/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigBlock.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigBlock.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigBlock.java
index 68897e8..cfcc199 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigBlock.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigBlock.java
@@ -20,54 +20,19 @@
 package org.apache.ambari.logfeeder.common;
 
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 
-import org.apache.ambari.logfeeder.metrics.MetricData;
 import org.apache.ambari.logfeeder.util.LogFeederUtil;
-import org.apache.commons.collections.MapUtils;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.log4j.Logger;
 import org.apache.log4j.Priority;
 
 
-public abstract class ConfigBlock {
-  private static final Logger LOG = Logger.getLogger(ConfigBlock.class);
-
-  private boolean drain = false;
-
+public abstract class ConfigBlock extends ConfigItem {
   protected Map<String, Object> configs;
   protected Map<String, String> contextFields = new HashMap<String, String>();
-  public MetricData statMetric = new MetricData(getStatMetricName(), false);
-  protected String getStatMetricName() {
-    return null;
-  }
-  
   public ConfigBlock() {
   }
 
-  /**
-   * Used while logging. Keep it short and meaningful
-   */
-  public abstract String getShortDescription();
-
-  /**
-   * Every implementor need to give name to the thread they create
-   */
-  public String getNameForThread() {
-    return this.getClass().getSimpleName();
-  }
-
-  public void addMetricsContainers(List<MetricData> metricsList) {
-    metricsList.add(statMetric);
-  }
-
-  /**
-   * This method needs to be overwritten by deriving classes.
-   */
-  public void init() throws Exception {
-  }
-
   public void loadConfig(Map<String, Object> map) {
     configs = LogFeederUtil.cloneObject(map);
 
@@ -81,46 +46,6 @@ public abstract class ConfigBlock {
     return configs;
   }
 
-  @SuppressWarnings("unchecked")
-  public boolean isEnabled() {
-    boolean isEnabled = getBooleanValue("is_enabled", true);
-    if (isEnabled) {
-      // Let's check for static conditions
-      Map<String, Object> conditions = (Map<String, Object>) configs.get("conditions");
-      boolean allow = true;
-      if (MapUtils.isNotEmpty(conditions)) {
-        allow = false;
-        for (String conditionType : conditions.keySet()) {
-          if (conditionType.equalsIgnoreCase("fields")) {
-            Map<String, Object> fields = (Map<String, Object>) conditions.get("fields");
-            for (String fieldName : fields.keySet()) {
-              Object values = fields.get(fieldName);
-              if (values instanceof String) {
-                allow = isFieldConditionMatch(fieldName, (String) values);
-              } else {
-                List<String> listValues = (List<String>) values;
-                for (String stringValue : listValues) {
-                  allow = isFieldConditionMatch(fieldName, stringValue);
-                  if (allow) {
-                    break;
-                  }
-                }
-              }
-              if (allow) {
-                break;
-              }
-            }
-          }
-          if (allow) {
-            break;
-          }
-        }
-        isEnabled = allow;
-      }
-    }
-    return isEnabled;
-  }
-
   public boolean isFieldConditionMatch(String fieldName, String stringValue) {
     boolean allow = false;
     String fieldValue = (String) configs.get(fieldName);
@@ -207,27 +132,17 @@ public abstract class ConfigBlock {
     return retValue;
   }
 
-  public Map<String, String> getContextFields() {
-    return contextFields;
-  }
-
-  public void incrementStat(int count) {
-    statMetric.value += count;
-  }
-
-  public void logStatForMetric(MetricData metric, String prefixStr) {
-    LogFeederUtil.logStatForMetric(metric, prefixStr, ", key=" + getShortDescription());
+  @Override
+  public boolean isEnabled() {
+    return getBooleanValue("is_enabled", true);
   }
 
-  public synchronized void logStat() {
-    logStatForMetric(statMetric, "Stat");
+  public Map<String, String> getContextFields() {
+    return contextFields;
   }
 
   public boolean logConfigs(Priority level) {
-    if (level.toInt() == Priority.INFO_INT && !LOG.isInfoEnabled()) {
-      return false;
-    }
-    if (level.toInt() == Priority.DEBUG_INT && !LOG.isDebugEnabled()) {
+    if (!super.logConfigs(level)) {
       return false;
     }
     LOG.log(level, "Printing configuration Block=" + getShortDescription());
@@ -235,12 +150,4 @@ public abstract class ConfigBlock {
     LOG.log(level, "contextFields=" + contextFields);
     return true;
   }
-
-  public boolean isDrain() {
-    return drain;
-  }
-
-  public void setDrain(boolean drain) {
-    this.drain = drain;
-  }
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/fd4a7a46/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigHandler.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigHandler.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigHandler.java
index effe980..726ff27 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigHandler.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigHandler.java
@@ -46,13 +46,19 @@ import org.apache.ambari.logfeeder.util.LogFeederUtil;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang.BooleanUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.ambari.logfeeder.util.AliasUtil.AliasType;
 import org.apache.ambari.logsearch.config.api.InputConfigMonitor;
+import org.apache.ambari.logsearch.config.api.model.inputconfig.FilterDescriptor;
+import org.apache.ambari.logsearch.config.api.model.inputconfig.InputConfig;
+import org.apache.ambari.logsearch.config.api.model.inputconfig.InputDescriptor;
+import org.apache.ambari.logsearch.config.zookeeper.model.inputconfig.impl.FilterDescriptorImpl;
+import org.apache.ambari.logsearch.config.zookeeper.model.inputconfig.impl.InputConfigImpl;
+import org.apache.ambari.logsearch.config.zookeeper.model.inputconfig.impl.InputDescriptorImpl;
 import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
 
-import com.google.common.collect.ImmutableMap;
 import com.google.gson.reflect.TypeToken;
 
 public class ConfigHandler implements InputConfigMonitor {
@@ -61,10 +67,11 @@ public class ConfigHandler implements InputConfigMonitor {
   private final OutputManager outputManager = new OutputManager();
   private final InputManager inputManager = new InputManager();
 
-  public static Map<String, Object> globalConfigs = new HashMap<>();
+  private final Map<String, Object> globalConfigs = new HashMap<>();
+  private final List<String> globalConfigJsons = new ArrayList<String>();
 
-  private final List<Map<String, Object>> inputConfigList = new ArrayList<>();
-  private final List<Map<String, Object>> filterConfigList = new ArrayList<>();
+  private final List<InputDescriptor> inputConfigList = new ArrayList<>();
+  private final List<FilterDescriptor> filterConfigList = new ArrayList<>();
   private final List<Map<String, Object>> outputConfigList = new ArrayList<>();
   
   private boolean simulateMode = false;
@@ -141,11 +148,12 @@ public class ConfigHandler implements InputConfigMonitor {
   }
   
   @Override
-  public void loadInputConfigs(String serviceName, String inputConfigData) throws Exception {
+  public void loadInputConfigs(String serviceName, InputConfig inputConfig) throws Exception {
     inputConfigList.clear();
     filterConfigList.clear();
     
-    loadConfigs(inputConfigData);
+    inputConfigList.addAll(inputConfig.getInput());
+    filterConfigList.addAll(inputConfig.getFilter());
     
     if (simulateMode) {
       InputSimulate.loadTypeToFilePath(inputConfigList);
@@ -173,14 +181,7 @@ public class ConfigHandler implements InputConfigMonitor {
       switch (key) {
         case "global" :
           globalConfigs.putAll((Map<String, Object>) configMap.get(key));
-          break;
-        case "input" :
-          List<Map<String, Object>> inputConfig = (List<Map<String, Object>>) configMap.get(key);
-          inputConfigList.addAll(inputConfig);
-          break;
-        case "filter" :
-          List<Map<String, Object>> filterConfig = (List<Map<String, Object>>) configMap.get(key);
-          filterConfigList.addAll(filterConfig);
+          globalConfigJsons.add(configData);
           break;
         case "output" :
           List<Map<String, Object>> outputConfig = (List<Map<String, Object>>) configMap.get(key);
@@ -192,21 +193,28 @@ public class ConfigHandler implements InputConfigMonitor {
     }
   }
   
+  @Override
+  public List<String> getGlobalConfigJsons() {
+    return globalConfigJsons;
+  }
+  
   private void simulateIfNeeded() throws Exception {
     int simulatedInputNumber = LogFeederUtil.getIntProperty("logfeeder.simulate.input_number", 0);
     if (simulatedInputNumber == 0)
       return;
     
-    List<Map<String, Object>> simulateInputConfigList = new ArrayList<>();
+    InputConfigImpl simulateInputConfig = new InputConfigImpl();
+    List<InputDescriptorImpl> inputConfigDescriptors = new ArrayList<>();
+    simulateInputConfig.setInput(inputConfigDescriptors);
+    simulateInputConfig.setFilter(new ArrayList<FilterDescriptorImpl>());
     for (int i = 0; i < simulatedInputNumber; i++) {
-      HashMap<String, Object> mapList = new HashMap<String, Object>();
-      mapList.put("source", "simulate");
-      mapList.put("rowtype", "service");
-      simulateInputConfigList.add(mapList);
+      InputDescriptorImpl inputDescriptor = new InputDescriptorImpl() {};
+      inputDescriptor.setSource("simulate");
+      inputDescriptor.setRowtype("service");
+      inputDescriptor.setAddFields(new HashMap<String, String>());
+      inputConfigDescriptors.add(inputDescriptor);
     }
     
-    Map<String, List<Map<String, Object>>> simulateInputConfigMap = ImmutableMap.of("input", simulateInputConfigList);
-    String simulateInputConfig = LogFeederUtil.getGson().toJson(simulateInputConfigMap);
     loadInputConfigs("Simulation", simulateInputConfig);
     
     simulateMode = true;
@@ -233,7 +241,7 @@ public class ConfigHandler implements InputConfigMonitor {
       output.loadConfig(map);
 
       // We will only check for is_enabled out here. Down below we will check whether this output is enabled for the input
-      if (output.getBooleanValue("is_enabled", true)) {
+      if (output.isEnabled()) {
         output.logConfigs(Level.INFO);
         outputManager.add(output);
       } else {
@@ -243,24 +251,23 @@ public class ConfigHandler implements InputConfigMonitor {
   }
 
   private void loadInputs(String serviceName) {
-    for (Map<String, Object> map : inputConfigList) {
-      if (map == null) {
+    for (InputDescriptor inputDescriptor : inputConfigList) {
+      if (inputDescriptor == null) {
         continue;
       }
-      mergeBlocks(globalConfigs, map);
 
-      String value = (String) map.get("source");
-      if (StringUtils.isEmpty(value)) {
+      String source = (String) inputDescriptor.getSource();
+      if (StringUtils.isEmpty(source)) {
         LOG.error("Input block doesn't have source element");
         continue;
       }
-      Input input = (Input) AliasUtil.getClassInstance(value, AliasType.INPUT);
+      Input input = (Input) AliasUtil.getClassInstance(source, AliasType.INPUT);
       if (input == null) {
         LOG.error("Input object could not be found");
         continue;
       }
-      input.setType(value);
-      input.loadConfig(map);
+      input.setType(source);
+      input.loadConfig(inputDescriptor);
 
       if (input.isEnabled()) {
         input.setOutputManager(outputManager);
@@ -278,13 +285,20 @@ public class ConfigHandler implements InputConfigMonitor {
 
     List<Input> toRemoveInputList = new ArrayList<Input>();
     for (Input input : inputManager.getInputList(serviceName)) {
-      for (Map<String, Object> map : filterConfigList) {
-        if (map == null) {
+      for (FilterDescriptor filterDescriptor : filterConfigList) {
+        if (filterDescriptor == null) {
+          continue;
+        }
+        if (BooleanUtils.isFalse(filterDescriptor.isEnabled())) {
+          LOG.debug("Ignoring filter " + filterDescriptor.getFilter() + " because it is disabled");
+          continue;
+        }
+        if (!input.isFilterRequired(filterDescriptor)) {
+          LOG.debug("Ignoring filter " + filterDescriptor.getFilter() + " for input " + input.getShortDescription());
           continue;
         }
-        mergeBlocks(globalConfigs, map);
 
-        String value = (String) map.get("filter");
+        String value = filterDescriptor.getFilter();
         if (StringUtils.isEmpty(value)) {
           LOG.error("Filter block doesn't have filter element");
           continue;
@@ -294,16 +308,12 @@ public class ConfigHandler implements InputConfigMonitor {
           LOG.error("Filter object could not be found");
           continue;
         }
-        filter.loadConfig(map);
+        filter.loadConfig(filterDescriptor);
         filter.setInput(input);
 
-        if (filter.isEnabled()) {
-          filter.setOutputManager(outputManager);
-          input.addFilter(filter);
-          filter.logConfigs(Level.INFO);
-        } else {
-          LOG.debug("Ignoring filter " + filter.getShortDescription() + " for input " + input.getShortDescription());
-        }
+        filter.setOutputManager(outputManager);
+        input.addFilter(filter);
+        filter.logConfigs(Level.INFO);
       }
       
       if (input.getFirstFilter() == null) {
@@ -318,43 +328,25 @@ public class ConfigHandler implements InputConfigMonitor {
   }
 
   private void sortFilters() {
-    Collections.sort(filterConfigList, new Comparator<Map<String, Object>>() {
-
+    Collections.sort(filterConfigList, new Comparator<FilterDescriptor>() {
       @Override
-      public int compare(Map<String, Object> o1, Map<String, Object> o2) {
-        Object o1Sort = o1.get("sort_order");
-        Object o2Sort = o2.get("sort_order");
+      public int compare(FilterDescriptor o1, FilterDescriptor o2) {
+        Integer o1Sort = o1.getSortOrder();
+        Integer o2Sort = o2.getSortOrder();
         if (o1Sort == null || o2Sort == null) {
           return 0;
         }
         
-        int o1Value = parseSort(o1, o1Sort);
-        int o2Value = parseSort(o2, o2Sort);
-        
-        return o1Value - o2Value;
-      }
-
-      private int parseSort(Map<String, Object> map, Object o) {
-        if (!(o instanceof Number)) {
-          try {
-            return (new Double(Double.parseDouble(o.toString()))).intValue();
-          } catch (Throwable t) {
-            LOG.error("Value is not of type Number. class=" + o.getClass().getName() + ", value=" + o.toString()
-              + ", map=" + map.toString());
-            return 0;
-          }
-        } else {
-          return ((Number) o).intValue();
-        }
+        return o1Sort - o2Sort;
       }
-    });
+    } );
   }
 
   private void assignOutputsToInputs(String serviceName) {
     Set<Output> usedOutputSet = new HashSet<Output>();
     for (Input input : inputManager.getInputList(serviceName)) {
       for (Output output : outputManager.getOutputs()) {
-        if (LogFeederUtil.isEnabled(output.getConfigs(), input.getConfigs())) {
+        if (input.isOutputRequired(output)) {
           usedOutputSet.add(output);
           input.addOutput(output);
         }

http://git-wip-us.apache.org/repos/asf/ambari/blob/fd4a7a46/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigItem.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigItem.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigItem.java
new file mode 100644
index 0000000..5c20a8e
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigItem.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ambari.logfeeder.common;
+
+import java.util.List;
+
+import org.apache.ambari.logfeeder.metrics.MetricData;
+import org.apache.ambari.logfeeder.util.LogFeederUtil;
+import org.apache.log4j.Logger;
+import org.apache.log4j.Priority;
+
+public abstract class ConfigItem {
+
+  protected static final Logger LOG = Logger.getLogger(ConfigBlock.class);
+  private boolean drain = false;
+  public MetricData statMetric = new MetricData(getStatMetricName(), false);
+
+  public ConfigItem() {
+    super();
+  }
+
+  protected String getStatMetricName() {
+    return null;
+  }
+
+  /**
+   * Used while logging. Keep it short and meaningful
+   */
+  public abstract String getShortDescription();
+
+  /**
+   * Every implementor need to give name to the thread they create
+   */
+  public String getNameForThread() {
+    return this.getClass().getSimpleName();
+  }
+
+  public void addMetricsContainers(List<MetricData> metricsList) {
+    metricsList.add(statMetric);
+  }
+
+  /**
+   * This method needs to be overwritten by deriving classes.
+   */
+  public void init() throws Exception {
+  }
+
+  public abstract boolean isEnabled();
+
+  public void incrementStat(int count) {
+    statMetric.value += count;
+  }
+
+  public void logStatForMetric(MetricData metric, String prefixStr) {
+    LogFeederUtil.logStatForMetric(metric, prefixStr, ", key=" + getShortDescription());
+  }
+
+  public synchronized void logStat() {
+    logStatForMetric(statMetric, "Stat");
+  }
+
+  public boolean logConfigs(Priority level) {
+    if (level.toInt() == Priority.INFO_INT && !LOG.isInfoEnabled()) {
+      return false;
+    }
+    if (level.toInt() == Priority.DEBUG_INT && !LOG.isDebugEnabled()) {
+      return false;
+    }
+    return true;
+  }
+
+  public boolean isDrain() {
+    return drain;
+  }
+
+  public void setDrain(boolean drain) {
+    this.drain = drain;
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/fd4a7a46/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/Filter.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/Filter.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/Filter.java
index afd903e..fd02497 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/Filter.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/Filter.java
@@ -24,7 +24,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-import org.apache.ambari.logfeeder.common.ConfigBlock;
+import org.apache.ambari.logfeeder.common.ConfigItem;
 import org.apache.ambari.logfeeder.common.LogfeederException;
 import org.apache.ambari.logfeeder.input.Input;
 import org.apache.ambari.logfeeder.input.InputMarker;
@@ -33,18 +33,28 @@ import org.apache.ambari.logfeeder.metrics.MetricData;
 import org.apache.ambari.logfeeder.output.OutputManager;
 import org.apache.ambari.logfeeder.util.AliasUtil;
 import org.apache.ambari.logfeeder.util.AliasUtil.AliasType;
-import org.apache.log4j.Logger;
+import org.apache.ambari.logsearch.config.api.model.inputconfig.FilterDescriptor;
+import org.apache.ambari.logsearch.config.api.model.inputconfig.MapFieldDescriptor;
+import org.apache.ambari.logsearch.config.api.model.inputconfig.PostMapValues;
+import org.apache.commons.lang.BooleanUtils;
 import org.apache.log4j.Priority;
 
-public abstract class Filter extends ConfigBlock {
-  private static final Logger LOG = Logger.getLogger(Filter.class);
-
+public abstract class Filter extends ConfigItem {
+  protected FilterDescriptor filterDescriptor;
   protected Input input;
   private Filter nextFilter = null;
   private OutputManager outputManager;
 
   private Map<String, List<Mapper>> postFieldValueMappers = new HashMap<String, List<Mapper>>();
 
+  public void loadConfig(FilterDescriptor filterDescriptor) {
+    this.filterDescriptor = filterDescriptor;
+  }
+
+  public FilterDescriptor getFilterDescriptor() {
+    return filterDescriptor;
+  }
+
   @Override
   public void init() throws Exception {
     super.init();
@@ -55,28 +65,22 @@ public abstract class Filter extends ConfigBlock {
     }
   }
 
-  @SuppressWarnings("unchecked")
   private void initializePostMapValues() {
-    Map<String, Object> postMapValues = (Map<String, Object>) getConfigValue("post_map_values");
+    Map<String, ? extends List<? extends PostMapValues>> postMapValues = filterDescriptor.getPostMapValues();
     if (postMapValues == null) {
       return;
     }
     for (String fieldName : postMapValues.keySet()) {
-      List<Map<String, Object>> mapList = null;
-      Object values = postMapValues.get(fieldName);
-      if (values instanceof List<?>) {
-        mapList = (List<Map<String, Object>>) values;
-      } else {
-        mapList = new ArrayList<Map<String, Object>>();
-        mapList.add((Map<String, Object>) values);
-      }
-      for (Map<String, Object> mapObject : mapList) {
-        for (String mapClassCode : mapObject.keySet()) {
+      List<? extends PostMapValues> values = postMapValues.get(fieldName);
+      for (PostMapValues pmv : values) {
+        for (MapFieldDescriptor mapFieldDescriptor : pmv.getMappers()) {
+          String mapClassCode = mapFieldDescriptor.getJsonName();
           Mapper mapper = (Mapper) AliasUtil.getClassInstance(mapClassCode, AliasType.MAPPER);
           if (mapper == null) {
-            break;
+            LOG.warn("Unknown mapper type: " + mapClassCode);
+            continue;
           }
-          if (mapper.init(getInput().getShortDescription(), fieldName, mapClassCode, mapObject.get(mapClassCode))) {
+          if (mapper.init(getInput().getShortDescription(), fieldName, mapClassCode, mapFieldDescriptor)) {
             List<Mapper> fieldMapList = postFieldValueMappers.get(fieldName);
             if (fieldMapList == null) {
               fieldMapList = new ArrayList<Mapper>();
@@ -156,15 +160,8 @@ public abstract class Filter extends ConfigBlock {
   }
 
   @Override
-  public boolean isFieldConditionMatch(String fieldName, String stringValue) {
-    if (!super.isFieldConditionMatch(fieldName, stringValue)) {
-      if (input != null) {
-        return input.isFieldConditionMatch(fieldName, stringValue);
-      } else {
-        return false;
-      }
-    }
-    return true;
+  public boolean isEnabled() {
+    return BooleanUtils.isNotFalse(filterDescriptor.isEnabled());
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/ambari/blob/fd4a7a46/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterGrok.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterGrok.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterGrok.java
index 7e2da70..70aea65 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterGrok.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterGrok.java
@@ -38,6 +38,8 @@ import org.apache.ambari.logfeeder.common.LogfeederException;
 import org.apache.ambari.logfeeder.input.InputMarker;
 import org.apache.ambari.logfeeder.metrics.MetricData;
 import org.apache.ambari.logfeeder.util.LogFeederUtil;
+import org.apache.ambari.logsearch.config.api.model.inputconfig.FilterGrokDescriptor;
+import org.apache.commons.lang3.BooleanUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
@@ -75,11 +77,10 @@ public class FilterGrok extends Filter {
     super.init();
 
     try {
-      messagePattern = escapePattern(getStringValue("message_pattern"));
-      multilinePattern = escapePattern(getStringValue("multiline_pattern"));
-      sourceField = getStringValue("source_field");
-      removeSourceField = getBooleanValue("remove_source_field",
-        removeSourceField);
+      messagePattern = escapePattern(((FilterGrokDescriptor)filterDescriptor).getMessagePattern());
+      multilinePattern = escapePattern(((FilterGrokDescriptor)filterDescriptor).getMultilinePattern());
+      sourceField = ((FilterGrokDescriptor)filterDescriptor).getSourceField();
+      removeSourceField = BooleanUtils.toBooleanDefaultIfNull(filterDescriptor.isRemoveSourceField(), removeSourceField);
 
       LOG.info("init() done. grokPattern=" + messagePattern + ", multilinePattern=" + multilinePattern + ", " +
       getShortDescription());

http://git-wip-us.apache.org/repos/asf/ambari/blob/fd4a7a46/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterJSON.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterJSON.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterJSON.java
index 35f692e..cfccdeb 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterJSON.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterJSON.java
@@ -25,12 +25,9 @@ import org.apache.ambari.logfeeder.common.LogfeederException;
 import org.apache.ambari.logfeeder.input.InputMarker;
 import org.apache.ambari.logfeeder.util.DateUtil;
 import org.apache.ambari.logfeeder.util.LogFeederUtil;
-import org.apache.log4j.Logger;
 
 public class FilterJSON extends Filter {
   
-  private static final Logger LOG  = Logger.getLogger(FilterJSON.class);
-
   @Override
   public void apply(String inputStr, InputMarker inputMarker) throws LogfeederException {
     Map<String, Object> jsonMap = null;

http://git-wip-us.apache.org/repos/asf/ambari/blob/fd4a7a46/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterKeyValue.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterKeyValue.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterKeyValue.java
index b04a439..f2a4186 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterKeyValue.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterKeyValue.java
@@ -28,13 +28,11 @@ import org.apache.ambari.logfeeder.common.LogfeederException;
 import org.apache.ambari.logfeeder.input.InputMarker;
 import org.apache.ambari.logfeeder.metrics.MetricData;
 import org.apache.ambari.logfeeder.util.LogFeederUtil;
+import org.apache.ambari.logsearch.config.api.model.inputconfig.FilterKeyValueDescriptor;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.log4j.Level;
-import org.apache.log4j.Logger;
 
 public class FilterKeyValue extends Filter {
-  private static final Logger LOG = Logger.getLogger(FilterKeyValue.class);
-
   private String sourceField = null;
   private String valueSplit = "=";
   private String fieldSplit = "\t";
@@ -46,10 +44,10 @@ public class FilterKeyValue extends Filter {
   public void init() throws Exception {
     super.init();
 
-    sourceField = getStringValue("source_field");
-    valueSplit = getStringValue("value_split", valueSplit);
-    fieldSplit = getStringValue("field_split", fieldSplit);
-    valueBorders = getStringValue("value_borders");
+    sourceField = filterDescriptor.getSourceField();
+    valueSplit = StringUtils.defaultString(((FilterKeyValueDescriptor)filterDescriptor).getValueSplit(), valueSplit);
+    fieldSplit = StringUtils.defaultString(((FilterKeyValueDescriptor)filterDescriptor).getFieldSplit(), fieldSplit);
+    valueBorders = ((FilterKeyValueDescriptor)filterDescriptor).getValueBorders();
 
     LOG.info("init() done. source_field=" + sourceField + ", value_split=" + valueSplit + ", " + ", field_split=" +
         fieldSplit + ", " + getShortDescription());

http://git-wip-us.apache.org/repos/asf/ambari/blob/fd4a7a46/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/AbstractInputFile.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/AbstractInputFile.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/AbstractInputFile.java
index 41a1fa5..cfa1903 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/AbstractInputFile.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/AbstractInputFile.java
@@ -29,14 +29,14 @@ import java.util.HashMap;
 import java.util.Map;
 
 import org.apache.ambari.logfeeder.util.LogFeederUtil;
+import org.apache.ambari.logsearch.config.api.model.inputconfig.InputFileBaseDescriptor;
+import org.apache.commons.lang.BooleanUtils;
+import org.apache.commons.lang.ObjectUtils;
 import org.apache.commons.lang3.ArrayUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.log4j.Level;
-import org.apache.log4j.Logger;
 
 public abstract class AbstractInputFile extends Input {
-  protected static final Logger LOG = Logger.getLogger(AbstractInputFile.class);
-
   private static final int DEFAULT_CHECKPOINT_INTERVAL_MS = 5 * 1000;
 
   protected File[] logFiles;
@@ -73,16 +73,16 @@ public abstract class AbstractInputFile extends Input {
 
     // Let's close the file and set it to true after we start monitoring it
     setClosed(true);
-    logPath = getStringValue("path");
-    tail = getBooleanValue("tail", tail);
-    checkPointIntervalMS = getIntValue("checkpoint.interval.ms", DEFAULT_CHECKPOINT_INTERVAL_MS);
+    logPath = inputDescriptor.getPath();
+    tail = BooleanUtils.toBooleanDefaultIfNull(inputDescriptor.isTail(), tail);
+    checkPointIntervalMS = (int) ObjectUtils.defaultIfNull(((InputFileBaseDescriptor)inputDescriptor).getCheckpointIntervalMs(), DEFAULT_CHECKPOINT_INTERVAL_MS);
 
     if (StringUtils.isEmpty(logPath)) {
       LOG.error("path is empty for file input. " + getShortDescription());
       return;
     }
 
-    String startPosition = getStringValue("start_position");
+    String startPosition = inputDescriptor.getStartPosition();
     if (StringUtils.isEmpty(startPosition) || startPosition.equalsIgnoreCase("beginning") ||
         startPosition.equalsIgnoreCase("begining") || !tail) {
       isStartFromBegining = true;
@@ -313,7 +313,7 @@ public abstract class AbstractInputFile extends Input {
 
   @Override
   public String getShortDescription() {
-    return "input:source=" + getStringValue("source") + ", path=" +
+    return "input:source=" + inputDescriptor.getSource() + ", path=" +
         (!ArrayUtils.isEmpty(logFiles) ? logFiles[0].getAbsolutePath() : logPath);
   }
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/fd4a7a46/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/Input.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/Input.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/Input.java
index 9f54d8a..fba596d 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/Input.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/Input.java
@@ -21,23 +21,25 @@ package org.apache.ambari.logfeeder.input;
 
 import java.io.File;
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
 import org.apache.ambari.logfeeder.input.cache.LRUCache;
-import org.apache.ambari.logfeeder.common.ConfigBlock;
+import org.apache.ambari.logfeeder.common.ConfigItem;
 import org.apache.ambari.logfeeder.common.LogfeederException;
 import org.apache.ambari.logfeeder.filter.Filter;
 import org.apache.ambari.logfeeder.metrics.MetricData;
 import org.apache.ambari.logfeeder.output.Output;
 import org.apache.ambari.logfeeder.output.OutputManager;
 import org.apache.ambari.logfeeder.util.LogFeederUtil;
-import org.apache.log4j.Logger;
-
-public abstract class Input extends ConfigBlock implements Runnable {
-  private static final Logger LOG = Logger.getLogger(Input.class);
-
+import org.apache.ambari.logsearch.config.api.model.inputconfig.Conditions;
+import org.apache.ambari.logsearch.config.api.model.inputconfig.Fields;
+import org.apache.ambari.logsearch.config.api.model.inputconfig.FilterDescriptor;
+import org.apache.ambari.logsearch.config.api.model.inputconfig.InputDescriptor;
+import org.apache.commons.lang.BooleanUtils;
+import org.apache.log4j.Priority;
+
+public abstract class Input extends ConfigItem implements Runnable {
   private static final boolean DEFAULT_TAIL = true;
   private static final boolean DEFAULT_USE_EVENT_MD5 = false;
   private static final boolean DEFAULT_GEN_EVENT_MD5 = true;
@@ -47,12 +49,8 @@ public abstract class Input extends ConfigBlock implements Runnable {
   private static final long DEFAULT_CACHE_DEDUP_INTERVAL = 1000;
   private static final String DEFAULT_CACHE_KEY_FIELD = "log_message";
 
-  private static final String CACHE_ENABLED = "cache_enabled";
-  private static final String CACHE_KEY_FIELD = "cache_key_field";
-  private static final String CACHE_LAST_DEDUP_ENABLED = "cache_last_dedup_enabled";
-  private static final String CACHE_SIZE = "cache_size";
-  private static final String CACHE_DEDUP_INTERVAL = "cache_dedup_interval";
-
+  protected InputDescriptor inputDescriptor;
+  
   protected InputManager inputManager;
   protected OutputManager outputManager;
   private List<Output> outputList = new ArrayList<Output>();
@@ -75,21 +73,12 @@ public abstract class Input extends ConfigBlock implements Runnable {
     return null;
   }
   
-  @Override
-  public void loadConfig(Map<String, Object> map) {
-    super.loadConfig(map);
-    String typeValue = getStringValue("type");
-    if (typeValue != null) {
-      // Explicitly add type and value to field list
-      contextFields.put("type", typeValue);
-      @SuppressWarnings("unchecked")
-      Map<String, Object> addFields = (Map<String, Object>) map.get("add_fields");
-      if (addFields == null) {
-        addFields = new HashMap<String, Object>();
-        map.put("add_fields", addFields);
-      }
-      addFields.put("type", typeValue);
-    }
+  public void loadConfig(InputDescriptor inputDescriptor) {
+    this.inputDescriptor = inputDescriptor;
+  }
+
+  public InputDescriptor getInputDescriptor() {
+    return inputDescriptor;
   }
 
   public void setType(String type) {
@@ -104,6 +93,12 @@ public abstract class Input extends ConfigBlock implements Runnable {
     this.outputManager = outputManager;
   }
 
+  public boolean isFilterRequired(FilterDescriptor filterDescriptor) {
+    Conditions conditions = filterDescriptor.getConditions();
+    Fields fields = conditions.getFields();
+    return fields.getType().contains(inputDescriptor.getType());
+  }
+
   public void addFilter(Filter filter) {
     if (firstFilter == null) {
       firstFilter = filter;
@@ -116,6 +111,22 @@ public abstract class Input extends ConfigBlock implements Runnable {
     }
   }
 
+  @SuppressWarnings("unchecked")
+  public boolean isOutputRequired(Output output) {
+    Map<String, Object> conditions = (Map<String, Object>) output.getConfigs().get("conditions");
+    if (conditions == null) {
+      return false;
+    }
+    
+    Map<String, Object> fields = (Map<String, Object>) conditions.get("fields");
+    if (fields == null) {
+      return false;
+    }
+    
+    List<String> types = (List<String>) fields.get("rowtype");
+    return types.contains(inputDescriptor.getRowtype());
+  }
+
   public void addOutput(Output output) {
     outputList.add(output);
   }
@@ -124,9 +135,9 @@ public abstract class Input extends ConfigBlock implements Runnable {
   public void init() throws Exception {
     super.init();
     initCache();
-    tail = getBooleanValue("tail", DEFAULT_TAIL);
-    useEventMD5 = getBooleanValue("use_event_md5_as_id", DEFAULT_USE_EVENT_MD5);
-    genEventMD5 = getBooleanValue("gen_event_md5", DEFAULT_GEN_EVENT_MD5);
+    tail = BooleanUtils.toBooleanDefaultIfNull(inputDescriptor.isTail(), DEFAULT_TAIL);
+    useEventMD5 = BooleanUtils.toBooleanDefaultIfNull(inputDescriptor.isUseEventMd5AsId(), DEFAULT_USE_EVENT_MD5);
+    genEventMD5 = BooleanUtils.toBooleanDefaultIfNull(inputDescriptor.isGenEventMd5(), DEFAULT_GEN_EVENT_MD5);
 
     if (firstFilter != null) {
       firstFilter.init();
@@ -236,26 +247,26 @@ public abstract class Input extends ConfigBlock implements Runnable {
   }
 
   private void initCache() {
-    boolean cacheEnabled = getConfigValue(CACHE_ENABLED) != null
-      ? getBooleanValue(CACHE_ENABLED, DEFAULT_CACHE_ENABLED)
+    boolean cacheEnabled = inputDescriptor.isCacheEnabled() != null
+      ? inputDescriptor.isCacheEnabled()
       : LogFeederUtil.getBooleanProperty("logfeeder.cache.enabled", DEFAULT_CACHE_ENABLED);
     if (cacheEnabled) {
-      String cacheKeyField = getConfigValue(CACHE_KEY_FIELD) != null
-        ? getStringValue(CACHE_KEY_FIELD)
+      String cacheKeyField = inputDescriptor.getCacheKeyField() != null
+        ? inputDescriptor.getCacheKeyField()
         : LogFeederUtil.getStringProperty("logfeeder.cache.key.field", DEFAULT_CACHE_KEY_FIELD);
 
-      setCacheKeyField(getStringValue(cacheKeyField));
+      setCacheKeyField(cacheKeyField);
 
-      boolean cacheLastDedupEnabled = getConfigValue(CACHE_LAST_DEDUP_ENABLED) != null
-        ? getBooleanValue(CACHE_LAST_DEDUP_ENABLED, DEFAULT_CACHE_DEDUP_LAST)
+      boolean cacheLastDedupEnabled = inputDescriptor.getCacheLastDedupEnabled() != null
+        ? inputDescriptor.getCacheLastDedupEnabled()
         : LogFeederUtil.getBooleanProperty("logfeeder.cache.last.dedup.enabled", DEFAULT_CACHE_DEDUP_LAST);
 
-      int cacheSize = getConfigValue(CACHE_SIZE) != null
-        ? getIntValue(CACHE_SIZE, DEFAULT_CACHE_SIZE)
+      int cacheSize = inputDescriptor.getCacheSize() != null
+        ? inputDescriptor.getCacheSize()
         : LogFeederUtil.getIntProperty("logfeeder.cache.size", DEFAULT_CACHE_SIZE);
 
-      long cacheDedupInterval = getConfigValue(CACHE_DEDUP_INTERVAL) != null
-        ? getLongValue(CACHE_DEDUP_INTERVAL, DEFAULT_CACHE_DEDUP_INTERVAL)
+      long cacheDedupInterval = inputDescriptor.getCacheDedupInterval() != null
+        ? inputDescriptor.getCacheDedupInterval()
         : Long.parseLong(LogFeederUtil.getStringProperty("logfeeder.cache.dedup.interval", String.valueOf(DEFAULT_CACHE_DEDUP_INTERVAL)));
 
       setCache(new LRUCache(cacheSize, filePath, cacheDedupInterval, cacheLastDedupEnabled));
@@ -319,6 +330,11 @@ public abstract class Input extends ConfigBlock implements Runnable {
   }
 
   @Override
+  public boolean isEnabled() {
+    return BooleanUtils.isNotFalse(inputDescriptor.isEnabled());
+  }
+
+  @Override
   public String getNameForThread() {
     if (filePath != null) {
       try {
@@ -331,7 +347,17 @@ public abstract class Input extends ConfigBlock implements Runnable {
   }
 
   @Override
+  public boolean logConfigs(Priority level) {
+    if (!super.logConfigs(level)) {
+      return false;
+    }
+    LOG.log(level, "Printing Input=" + getShortDescription());
+    LOG.log(level, "description=" + inputDescriptor.getPath());
+    return true;
+  }
+
+  @Override
   public String toString() {
     return getShortDescription();
   }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/fd4a7a46/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputFile.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputFile.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputFile.java
index 3737839..fc40ca4 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputFile.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputFile.java
@@ -25,7 +25,9 @@ import java.io.FileNotFoundException;
 
 import org.apache.ambari.logfeeder.input.reader.LogsearchReaderFactory;
 import org.apache.ambari.logfeeder.util.FileUtil;
+import org.apache.ambari.logsearch.config.api.model.inputconfig.InputFileDescriptor;
 import org.apache.commons.io.filefilter.WildcardFileFilter;
+import org.apache.commons.lang.BooleanUtils;
 import org.apache.commons.lang3.ArrayUtils;
 import org.apache.solr.common.util.Base64;
 
@@ -62,7 +64,7 @@ public class InputFile extends AbstractInputFile {
 
   @Override
   void start() throws Exception {
-    boolean isProcessFile = getBooleanValue("process_file", true);
+    boolean isProcessFile = BooleanUtils.toBooleanDefaultIfNull(((InputFileDescriptor)inputDescriptor).getProcessFile(), true);
     if (isProcessFile) {
       if (tail) {
         processFile(logFiles[0]);
@@ -100,7 +102,7 @@ public class InputFile extends AbstractInputFile {
   }
 
   private void copyFiles(File[] files) {
-    boolean isCopyFile = getBooleanValue("copy_file", false);
+    boolean isCopyFile = BooleanUtils.toBooleanDefaultIfNull(((InputFileDescriptor)inputDescriptor).getCopyFile(), false);
     if (isCopyFile && files != null) {
       for (File file : files) {
         try {

http://git-wip-us.apache.org/repos/asf/ambari/blob/fd4a7a46/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputS3File.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputS3File.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputS3File.java
index f560379..4bf162b 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputS3File.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputS3File.java
@@ -23,6 +23,7 @@ import java.io.File;
 import java.io.IOException;
 
 import org.apache.ambari.logfeeder.util.S3Util;
+import org.apache.ambari.logsearch.config.api.model.inputconfig.InputS3FileDescriptor;
 import org.apache.commons.lang.ArrayUtils;
 import org.apache.solr.common.util.Base64;
 
@@ -78,8 +79,8 @@ public class InputS3File extends AbstractInputFile {
 
   @Override
   protected BufferedReader openLogFile(File logPathFile) throws IOException {
-    String s3AccessKey = getStringValue("s3_access_key");
-    String s3SecretKey = getStringValue("s3_secret_key");
+    String s3AccessKey = ((InputS3FileDescriptor)inputDescriptor).getS3AccessKey();
+    String s3SecretKey = ((InputS3FileDescriptor)inputDescriptor).getS3SecretKey();
     BufferedReader br = S3Util.getReader(logPathFile.getPath(), s3AccessKey, s3SecretKey);
     fileKey = getFileKey(logPathFile);
     base64FileKey = Base64.byteArrayToBase64(fileKey.toString().getBytes());

http://git-wip-us.apache.org/repos/asf/ambari/blob/fd4a7a46/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputSimulate.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputSimulate.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputSimulate.java
index d193cdb..5e7bdb3 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputSimulate.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputSimulate.java
@@ -21,7 +21,6 @@ package org.apache.ambari.logfeeder.input;
 import java.net.InetAddress;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
@@ -35,25 +34,23 @@ import org.apache.ambari.logfeeder.filter.Filter;
 import org.apache.ambari.logfeeder.filter.FilterJSON;
 import org.apache.ambari.logfeeder.output.Output;
 import org.apache.ambari.logfeeder.util.LogFeederUtil;
+import org.apache.ambari.logsearch.config.api.model.inputconfig.InputDescriptor;
+import org.apache.ambari.logsearch.config.zookeeper.model.inputconfig.impl.FilterJsonDescriptorImpl;
+import org.apache.ambari.logsearch.config.zookeeper.model.inputconfig.impl.InputDescriptorImpl;
 import org.apache.commons.collections.MapUtils;
-import org.apache.log4j.Logger;
 import org.apache.solr.common.util.Base64;
 
 import com.google.common.base.Joiner;
 
 public class InputSimulate extends Input {
-  private static final Logger LOG = Logger.getLogger(InputSimulate.class);
-
   private static final String LOG_TEXT_PATTERN = "{ logtime=\"%d\", level=\"%s\", log_message=\"%s\", host=\"%s\"}";
   
   private static final Map<String, String> typeToFilePath = new HashMap<>();
-  private static List<String> inputTypes = new ArrayList<>();
-  public static void loadTypeToFilePath(List<Map<String, Object>> inputList) {
-    for (Map<String, Object> input : inputList) {
-      if (input.containsKey("type") && input.containsKey("path")) {
-        typeToFilePath.put((String)input.get("type"), (String)input.get("path"));
-        inputTypes.add((String)input.get("type"));
-      }
+  private static final List<String> inputTypes = new ArrayList<>();
+  public static void loadTypeToFilePath(List<InputDescriptor> inputList) {
+    for (InputDescriptor input : inputList) {
+      typeToFilePath.put(input.getType(), input.getPath());
+      inputTypes.add(input.getType());
     }
   }
   
@@ -86,7 +83,7 @@ public class InputSimulate extends Input {
     this.host = "#" + hostNumber.incrementAndGet() + "-" + LogFeederUtil.hostName;
     
     Filter filter = new FilterJSON();
-    filter.loadConfig(Collections.<String, Object> emptyMap());
+    filter.loadConfig(new FilterJsonDescriptorImpl());
     filter.setInput(this);
     addFilter(filter);
   }
@@ -141,7 +138,7 @@ public class InputSimulate extends Input {
     String type = types.get(typePos);
     String filePath = MapUtils.getString(typeToFilePath, type, "path of " + type);
     
-    configs.put("type", type);
+    ((InputDescriptorImpl)inputDescriptor).setType(type);
     setFilePath(filePath);
     
     return type;

http://git-wip-us.apache.org/repos/asf/ambari/blob/fd4a7a46/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/loglevelfilter/FilterLogData.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/loglevelfilter/FilterLogData.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/loglevelfilter/FilterLogData.java
index 1f635af..6173f53 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/loglevelfilter/FilterLogData.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/loglevelfilter/FilterLogData.java
@@ -44,7 +44,7 @@ public enum FilterLogData {
   }
 
   public boolean isAllowed(Map<String, Object> jsonObj, InputMarker inputMarker) {
-    if ("audit".equals(inputMarker.input.getConfigs().get(LogFeederConstants.ROW_TYPE)))
+    if ("audit".equals(inputMarker.input.getInputDescriptor().getRowtype()))
       return true;
     
     boolean isAllowed = applyFilter(jsonObj);

http://git-wip-us.apache.org/repos/asf/ambari/blob/fd4a7a46/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/Mapper.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/Mapper.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/Mapper.java
index 96709c0..5facf76 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/Mapper.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/Mapper.java
@@ -21,12 +21,14 @@ package org.apache.ambari.logfeeder.mapper;
 
 import java.util.Map;
 
+import org.apache.ambari.logsearch.config.api.model.inputconfig.MapFieldDescriptor;
+
 public abstract class Mapper {
   private String inputDesc;
   protected String fieldName;
   private String mapClassCode;
 
-  public abstract boolean init(String inputDesc, String fieldName, String mapClassCode, Object mapConfigs);
+  public abstract boolean init(String inputDesc, String fieldName, String mapClassCode, MapFieldDescriptor mapFieldDescriptor);
 
   protected void init(String inputDesc, String fieldName, String mapClassCode) {
     this.inputDesc = inputDesc;

http://git-wip-us.apache.org/repos/asf/ambari/blob/fd4a7a46/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperDate.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperDate.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperDate.java
index 6a7fad7..5d34c06 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperDate.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperDate.java
@@ -26,6 +26,8 @@ import java.util.Map;
 
 import org.apache.ambari.logfeeder.common.LogFeederConstants;
 import org.apache.ambari.logfeeder.util.LogFeederUtil;
+import org.apache.ambari.logsearch.config.api.model.inputconfig.MapDateDescriptor;
+import org.apache.ambari.logsearch.config.api.model.inputconfig.MapFieldDescriptor;
 import org.apache.commons.lang.time.DateUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.log4j.Level;
@@ -39,18 +41,11 @@ public class MapperDate extends Mapper {
   private SimpleDateFormat srcDateFormatter=null;
 
   @Override
-  public boolean init(String inputDesc, String fieldName, String mapClassCode, Object mapConfigs) {
+  public boolean init(String inputDesc, String fieldName, String mapClassCode, MapFieldDescriptor mapFieldDescriptor) {
     init(inputDesc, fieldName, mapClassCode);
-    if (!(mapConfigs instanceof Map)) {
-      LOG.fatal("Can't initialize object. mapConfigs class is not of type Map. " + mapConfigs.getClass().getName() +
-        ", map=" + this);
-      return false;
-    }
     
-    @SuppressWarnings("unchecked")
-    Map<String, Object> mapObjects = (Map<String, Object>) mapConfigs;
-    String targetDateFormat = (String) mapObjects.get("target_date_pattern");
-    String srcDateFormat = (String) mapObjects.get("src_date_pattern");
+    String targetDateFormat = ((MapDateDescriptor)mapFieldDescriptor).getTargetDatePattern();
+    String srcDateFormat = ((MapDateDescriptor)mapFieldDescriptor).getSourceDatePattern();
     if (StringUtils.isEmpty(targetDateFormat)) {
       LOG.fatal("Date format for map is empty. " + this);
     } else {

http://git-wip-us.apache.org/repos/asf/ambari/blob/fd4a7a46/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperFieldCopy.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperFieldCopy.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperFieldCopy.java
index 39e1ff4..a463f49 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperFieldCopy.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperFieldCopy.java
@@ -21,6 +21,8 @@ package org.apache.ambari.logfeeder.mapper;
 
 import java.util.Map;
 
+import org.apache.ambari.logsearch.config.api.model.inputconfig.MapFieldCopyDescriptor;
+import org.apache.ambari.logsearch.config.api.model.inputconfig.MapFieldDescriptor;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.log4j.Logger;
 
@@ -33,16 +35,9 @@ public class MapperFieldCopy extends Mapper {
   private String copyName = null;
 
   @Override
-  public boolean init(String inputDesc, String fieldName, String mapClassCode, Object mapConfigs) {
+  public boolean init(String inputDesc, String fieldName, String mapClassCode, MapFieldDescriptor mapFieldDescriptor) {
     init(inputDesc, fieldName, mapClassCode);
-    if (!(mapConfigs instanceof Map)) {
-      LOG.fatal("Can't initialize object. mapConfigs class is not of type Map. " + mapConfigs.getClass().getName());
-      return false;
-    }
-    
-    @SuppressWarnings("unchecked")
-    Map<String, Object> mapObjects = (Map<String, Object>) mapConfigs;
-    copyName = (String) mapObjects.get("copy_name");
+    copyName = ((MapFieldCopyDescriptor)mapFieldDescriptor).getCopyName();
     if (StringUtils.isEmpty(copyName)) {
       LOG.fatal("Map copy name is empty.");
       return false;

http://git-wip-us.apache.org/repos/asf/ambari/blob/fd4a7a46/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperFieldName.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperFieldName.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperFieldName.java
index 9b6e83c..3f160da 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperFieldName.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperFieldName.java
@@ -22,6 +22,8 @@ package org.apache.ambari.logfeeder.mapper;
 import java.util.Map;
 
 import org.apache.ambari.logfeeder.util.LogFeederUtil;
+import org.apache.ambari.logsearch.config.api.model.inputconfig.MapFieldDescriptor;
+import org.apache.ambari.logsearch.config.api.model.inputconfig.MapFieldNameDescriptor;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
@@ -35,16 +37,10 @@ public class MapperFieldName extends Mapper {
   private String newValue = null;
 
   @Override
-  public boolean init(String inputDesc, String fieldName, String mapClassCode, Object mapConfigs) {
+  public boolean init(String inputDesc, String fieldName, String mapClassCode, MapFieldDescriptor mapFieldDescriptor) {
     init(inputDesc, fieldName, mapClassCode);
-    if (!(mapConfigs instanceof Map)) {
-      LOG.fatal("Can't initialize object. mapConfigs class is not of type Map. " + mapConfigs.getClass().getName());
-      return false;
-    }
-    
-    @SuppressWarnings("unchecked")
-    Map<String, Object> mapObjects = (Map<String, Object>) mapConfigs;
-    newValue = (String) mapObjects.get("new_fieldname");
+
+    newValue = ((MapFieldNameDescriptor)mapFieldDescriptor).getNewFieldName();
     if (StringUtils.isEmpty(newValue)) {
       LOG.fatal("Map field value is empty.");
       return false;

http://git-wip-us.apache.org/repos/asf/ambari/blob/fd4a7a46/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperFieldValue.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperFieldValue.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperFieldValue.java
index 87cda65..03ff95b 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperFieldValue.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperFieldValue.java
@@ -22,6 +22,8 @@ package org.apache.ambari.logfeeder.mapper;
 import java.util.Map;
 
 import org.apache.ambari.logfeeder.util.LogFeederUtil;
+import org.apache.ambari.logsearch.config.api.model.inputconfig.MapFieldDescriptor;
+import org.apache.ambari.logsearch.config.api.model.inputconfig.MapFieldValueDescriptor;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
@@ -36,17 +38,11 @@ public class MapperFieldValue extends Mapper {
   private String newValue = null;
 
   @Override
-  public boolean init(String inputDesc, String fieldName, String mapClassCode, Object mapConfigs) {
+  public boolean init(String inputDesc, String fieldName, String mapClassCode, MapFieldDescriptor mapFieldDescriptor) {
     init(inputDesc, fieldName, mapClassCode);
-    if (!(mapConfigs instanceof Map)) {
-      LOG.fatal("Can't initialize object. mapConfigs class is not of type Map. " + mapConfigs.getClass().getName());
-      return false;
-    }
     
-    @SuppressWarnings("unchecked")
-    Map<String, Object> mapObjects = (Map<String, Object>) mapConfigs;
-    prevValue = (String) mapObjects.get("pre_value");
-    newValue = (String) mapObjects.get("post_value");
+    prevValue = ((MapFieldValueDescriptor)mapFieldDescriptor).getPreValue();
+    newValue = ((MapFieldValueDescriptor)mapFieldDescriptor).getPostValue();;
     if (StringUtils.isEmpty(newValue)) {
       LOG.fatal("Map field value is empty.");
       return false;

http://git-wip-us.apache.org/repos/asf/ambari/blob/fd4a7a46/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/Output.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/Output.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/Output.java
index bc6a553..65b9e19 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/Output.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/Output.java
@@ -28,11 +28,8 @@ import org.apache.ambari.logfeeder.common.ConfigBlock;
 import org.apache.ambari.logfeeder.input.InputMarker;
 import org.apache.ambari.logfeeder.metrics.MetricData;
 import org.apache.ambari.logfeeder.util.LogFeederUtil;
-import org.apache.log4j.Logger;
 
 public abstract class Output extends ConfigBlock {
-  private static final Logger LOG = Logger.getLogger(Output.class);
-
   private String destination = null;
 
   protected MetricData writeBytesMetric = new MetricData(getWriteBytesMetricName(), false);

http://git-wip-us.apache.org/repos/asf/ambari/blob/fd4a7a46/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputLineFilter.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputLineFilter.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputLineFilter.java
index fcf2695..8308a4f 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputLineFilter.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputLineFilter.java
@@ -41,7 +41,7 @@ public class OutputLineFilter {
   public Boolean apply(Map<String, Object> lineMap, Input input) {
     boolean isLogFilteredOut = false;
     LRUCache inputLruCache = input.getCache();
-    if (inputLruCache != null && "service".equals(input.getConfigs().get(LogFeederConstants.ROW_TYPE))) {
+    if (inputLruCache != null && "service".equals(input.getInputDescriptor().getRowtype())) {
       String logMessage = (String) lineMap.get(input.getCacheKeyField());
       Long timestamp = null;
       if (lineMap.containsKey((LogFeederConstants.IN_MEMORY_TIMESTAMP))) {


Mime
View raw message