Author: rgoers Date: Mon Jan 2 18:52:50 2012 New Revision: 1226514 URL: http://svn.apache.org/viewvc?rev=1226514&view=rev Log: Move Flume out of core. Add support for Flume-NG Added: logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers/log4j2-core/src/test/java/org/apache/logging/log4j/core/helpers/ logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers/log4j2-core/src/test/java/org/apache/logging/log4j/core/helpers/UUIDTest.java - copied, changed from r1225762, logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers/log4j2-core/src/test/java/org/apache/logging/log4j/core/appender/flume/UUIDTest.java logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers/log4j2-flume-ng/ (with props) logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers/log4j2-flume-ng/pom.xml logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers/log4j2-flume-ng/src/ logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers/log4j2-flume-ng/src/main/ logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers/log4j2-flume-ng/src/main/java/ logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers/log4j2-flume-ng/src/main/java/org/ logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers/log4j2-flume-ng/src/main/java/org/apache/ logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers/log4j2-flume-ng/src/main/java/org/apache/logging/ logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers/log4j2-flume-ng/src/main/java/org/apache/logging/log4j/ logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers/log4j2-flume-ng/src/main/java/org/apache/logging/log4j/flume/ logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers/log4j2-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/ logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers/log4j2-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/Agent.java logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers/log4j2-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeAvroAppender.java logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers/log4j2-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeAvroManager.java logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers/log4j2-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeEvent.java logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers/log4j2-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeEventFactory.java logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers/log4j2-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/package-info.java logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers/log4j2-flume-ng/src/test/ logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers/log4j2-flume-ng/src/test/java/ logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers/log4j2-flume-ng/src/test/java/org/ logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers/log4j2-flume-ng/src/test/java/org/apache/ logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers/log4j2-flume-ng/src/test/java/org/apache/logging/ logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers/log4j2-flume-ng/src/test/java/org/apache/logging/log4j/ logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers/log4j2-flume-ng/src/test/java/org/apache/logging/log4j/flume/ logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers/log4j2-flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/ logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers/log4j2-flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumeAvroAppenderTest.java logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers/log4j2-flume-og/ (with props) logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers/log4j2-flume-og/pom.xml logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers/log4j2-flume-og/src/ logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers/log4j2-flume-og/src/main/ logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers/log4j2-flume-og/src/main/java/ logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers/log4j2-flume-og/src/main/java/org/ logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers/log4j2-flume-og/src/main/java/org/apache/ logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers/log4j2-flume-og/src/main/java/org/apache/logging/ logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers/log4j2-flume-og/src/main/java/org/apache/logging/log4j/ logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers/log4j2-flume-og/src/main/java/org/apache/logging/log4j/flumeog/ logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers/log4j2-flume-og/src/main/java/org/apache/logging/log4j/flumeog/appender/ logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers/log4j2-flume-og/src/main/java/org/apache/logging/log4j/flumeog/appender/Agent.java logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers/log4j2-flume-og/src/main/java/org/apache/logging/log4j/flumeog/appender/FlumeAvroAppender.java logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers/log4j2-flume-og/src/main/java/org/apache/logging/log4j/flumeog/appender/FlumeAvroManager.java logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers/log4j2-flume-og/src/main/java/org/apache/logging/log4j/flumeog/appender/FlumeEvent.java logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers/log4j2-flume-og/src/main/java/org/apache/logging/log4j/flumeog/appender/FlumeEventFactory.java logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers/log4j2-flume-og/src/main/java/org/apache/logging/log4j/flumeog/appender/package-info.java logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers/log4j2-flume-og/src/test/ logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers/log4j2-flume-og/src/test/java/ logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers/log4j2-flume-og/src/test/java/org/ logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers/log4j2-flume-og/src/test/java/org/apache/ logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers/log4j2-flume-og/src/test/java/org/apache/logging/ logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers/log4j2-flume-og/src/test/java/org/apache/logging/log4j/ logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers/log4j2-flume-og/src/test/java/org/apache/logging/log4j/flumeog/ logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers/log4j2-flume-og/src/test/java/org/apache/logging/log4j/flumeog/appender/ logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers/log4j2-flume-og/src/test/java/org/apache/logging/log4j/flumeog/appender/FlumeAvroAppenderTest.java Removed: logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers/log4j2-core/src/main/java/org/apache/logging/log4j/core/appender/flume/ logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers/log4j2-core/src/test/java/org/apache/logging/log4j/core/appender/flume/ Modified: logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers/log4j2-core/pom.xml logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers/pom.xml Modified: logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers/log4j2-core/pom.xml URL: http://svn.apache.org/viewvc/logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers/log4j2-core/pom.xml?rev=1226514&r1=1226513&r2=1226514&view=diff ============================================================================== --- logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers/log4j2-core/pom.xml (original) +++ logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers/log4j2-core/pom.xml Mon Jan 2 18:52:50 2012 @@ -93,18 +93,6 @@ logback-core test - - com.cloudera - flume-core - 0.9.4-cdh3u1 - true - - - org.slf4j - slf4j-log4j12 - - - ch.qos.logback logback-classic Copied: logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers/log4j2-core/src/test/java/org/apache/logging/log4j/core/helpers/UUIDTest.java (from r1225762, logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers/log4j2-core/src/test/java/org/apache/logging/log4j/core/appender/flume/UUIDTest.java) URL: http://svn.apache.org/viewvc/logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers/log4j2-core/src/test/java/org/apache/logging/log4j/core/helpers/UUIDTest.java?p2=logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers/log4j2-core/src/test/java/org/apache/logging/log4j/core/helpers/UUIDTest.java&p1=logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers/log4j2-core/src/test/java/org/apache/logging/log4j/core/appender/flume/UUIDTest.java&r1=1225762&r2=1226514&rev=1226514&view=diff ============================================================================== --- logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers/log4j2-core/src/test/java/org/apache/logging/log4j/core/appender/flume/UUIDTest.java (original) +++ logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers/log4j2-core/src/test/java/org/apache/logging/log4j/core/helpers/UUIDTest.java Mon Jan 2 18:52:50 2012 @@ -14,7 +14,7 @@ * See the license for the specific language governing permissions and * limitations under the license. */ -package org.apache.logging.log4j.core.appender.flume; +package org.apache.logging.log4j.core.helpers; import org.apache.logging.log4j.core.helpers.UUIDUtil; import org.junit.Test; Propchange: logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers/log4j2-flume-ng/ ------------------------------------------------------------------------------ --- svn:ignore (added) +++ svn:ignore Mon Jan 2 18:52:50 2012 @@ -0,0 +1,2 @@ +target +*.iml Added: logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers/log4j2-flume-ng/pom.xml URL: http://svn.apache.org/viewvc/logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers/log4j2-flume-ng/pom.xml?rev=1226514&view=auto ============================================================================== --- logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers/log4j2-flume-ng/pom.xml (added) +++ logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers/log4j2-flume-ng/pom.xml Mon Jan 2 18:52:50 2012 @@ -0,0 +1,203 @@ + + + 4.0.0 + + org.apache.logging.rgoers + log4j2 + 1.99.0-SNAPSHOT + + org.apache.logging.rgoers + log4j2-flume-ng + jar + Log4J2 Flume NG + Log4j 2.0 Flume Appender + + ${basedir}/.. + + + + org.apache.logging.rgoers + log4j2-api + + + org.apache.logging.rgoers + log4j2-core + + + org.apache.logging.rgoers + slf4j-impl + + + junit + junit + 4.7 + test + + + org.apache.flume + flume-ng-core + 1.0.0-incubating + + + org.slf4j + slf4j-log4j12 + + + + + + + + org.apache.maven.plugins + maven-jar-plugin + + + + test-jar + + + + + + org.apache.maven.plugins + maven-surefire-plugin + + always + + + + + + + + org.apache.maven.plugins + maven-changes-plugin + 2.6 + + + + changes-report + + + + + %URL%/show_bug.cgi?id=%ISSUE% + + + + org.apache.maven.plugins + maven-checkstyle-plugin + 2.7 + + + ${log4j.parent.dir}/checkstyle.xml + ${log4j.parent.dir}/checkstyle-suppressions.xml + false + basedir=${basedir} + licensedir=${log4j.parent.dir}/checkstyle-header.txt + + + + org.apache.maven.plugins + maven-javadoc-plugin + 2.8 + + + false + true + + + issue + a + JIRA issue: + + + doubt + a + Troublesome: + + + compare + a + Compare with: + + + + + + non-aggregate + + javadoc + + + + + + org.codehaus.mojo + findbugs-maven-plugin + 2.3.2 + + Normal + Default + findbugs-exclude-filter.xml + + + + org.apache.maven.plugins + maven-jxr-plugin + 2.3 + + + non-aggregate + + jxr + + + + aggregate + + aggregate + + + + + + org.apache.maven.plugins + maven-pmd-plugin + + 1.5 + + + + org.codehaus.mojo + cobertura-maven-plugin + 2.2 + + + + + + + + + + + Added: logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers/log4j2-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/Agent.java URL: http://svn.apache.org/viewvc/logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers/log4j2-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/Agent.java?rev=1226514&view=auto ============================================================================== --- logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers/log4j2-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/Agent.java (added) +++ logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers/log4j2-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/Agent.java Mon Jan 2 18:52:50 2012 @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache license, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the license for the specific language governing permissions and + * limitations under the license. + */ +package org.apache.logging.log4j.flume.appender; + +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.core.config.plugins.Plugin; +import org.apache.logging.log4j.core.config.plugins.PluginAttr; +import org.apache.logging.log4j.core.config.plugins.PluginFactory; +import org.apache.logging.log4j.status.StatusLogger; + +/** + * Agent Specification for FlumeAvroAppender. + */ +@Plugin(name = "Agent", type = "Core", printObject = true) +public final class Agent { + + private static final String DEFAULT_HOST = "localhost"; + + private static final int DEFAULT_PORT = 35853; + + private static final Logger LOGGER = StatusLogger.getLogger(); + + private final String host; + + private final int port; + + private Agent(String host, int port) { + this.host = host; + this.port = port; + } + + /** + * Retrieve the host name. + * @return The name of the host. + */ + public String getHost() { + return host; + } + + /** + * Retrieve the port number. + * @return The port number. + */ + public int getPort() { + return port; + } + + @Override + public String toString() { + return "host=" + host + " port=" + port; + } + + /** + * Create an Agent. + * @param host The host name. + * @param port The port number. + * @return The Agent. + */ + @PluginFactory + public static Agent createAgent(@PluginAttr("host") String host, + @PluginAttr("port") String port) { + if (host == null) { + host = DEFAULT_HOST; + } + + int portNum; + if (port != null) { + try { + portNum = Integer.parseInt(port); + } catch (Exception ex) { + LOGGER.error("Error parsing port number " + port, ex); + return null; + } + } else { + portNum = DEFAULT_PORT; + } + return new Agent(host, portNum); + } +} Added: logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers/log4j2-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeAvroAppender.java URL: http://svn.apache.org/viewvc/logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers/log4j2-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeAvroAppender.java?rev=1226514&view=auto ============================================================================== --- logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers/log4j2-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeAvroAppender.java (added) +++ logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers/log4j2-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeAvroAppender.java Mon Jan 2 18:52:50 2012 @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache license, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the license for the specific language governing permissions and + * limitations under the license. + */ +package org.apache.logging.log4j.flume.appender; + +import org.apache.logging.log4j.core.Filter; +import org.apache.logging.log4j.core.Layout; +import org.apache.logging.log4j.core.LogEvent; +import org.apache.logging.log4j.core.appender.AppenderBase; +import org.apache.logging.log4j.core.config.plugins.Plugin; +import org.apache.logging.log4j.core.config.plugins.PluginAttr; +import org.apache.logging.log4j.core.config.plugins.PluginElement; +import org.apache.logging.log4j.core.config.plugins.PluginFactory; +import org.apache.logging.log4j.core.layout.RFC5424Layout; + +import java.net.InetAddress; + +/** + * An Appender that uses the Avro protocol to route events to Flume. + */ +@Plugin(name = "Flume", type = "Core", elementType = "appender", printObject = true) +public final class FlumeAvroAppender extends AppenderBase implements FlumeEventFactory { + + private FlumeAvroManager manager; + + private final String mdcIncludes; + private final String mdcExcludes; + private final String mdcRequired; + + private final String eventPrefix; + + private final String mdcPrefix; + + private final boolean compressBody; + + private final String hostname; + + private final int reconnectDelay; + + private final int retries; + + private final FlumeEventFactory factory; + + private FlumeAvroAppender(String name, Filter filter, Layout layout, boolean handleException, + String hostname, String includes, String excludes, String required, String mdcPrefix, + String eventPrefix, boolean compress, int delay, int retries, + FlumeEventFactory factory, FlumeAvroManager manager) { + super(name, filter, layout, handleException); + this.manager = manager; + this.mdcIncludes = includes; + this.mdcExcludes = excludes; + this.mdcRequired = required; + this.eventPrefix = eventPrefix; + this.mdcPrefix = mdcPrefix; + this.compressBody = compress; + this.hostname = hostname; + this.reconnectDelay = delay; + this.retries = retries; + this.factory = factory == null ? this : factory; + } + + /** + * Publish the event. + * @param event The LogEvent. + */ + public void append(LogEvent event) { + + FlumeEvent flumeEvent = factory.createEvent(event, mdcIncludes, mdcExcludes, mdcRequired, mdcPrefix, + eventPrefix, compressBody); + flumeEvent.setBody(getLayout().format(flumeEvent)); + manager.send(flumeEvent, reconnectDelay, retries); + } + + @Override + public void stop() { + super.stop(); + manager.release(); + } + + /** + * Create a Flume event. + * @param event The Log4j LogEvent. + * @param includes comma separated list of mdc elements to include. + * @param excludes comma separated list of mdc elements to exclude. + * @param required comma separated list of mdc elements that must be present with a value. + * @param mdcPrefix The prefix to add to MDC key names. + * @param eventPrefix The prefix to add to event fields. + * @param compress If true the body will be compressed. + * @return A Flume Event. + */ + public FlumeEvent createEvent(LogEvent event, String includes, String excludes, String required, + String mdcPrefix, String eventPrefix, boolean compress) { + return new FlumeEvent(event, mdcIncludes, mdcExcludes, mdcRequired, mdcPrefix, + eventPrefix, compressBody); + } + + /** + * Create a Flume Avro Appender. + * @param agents An array of Agents. + * @param delay The amount of time in milliseconds to wait between retries. + * @param agentRetries The number of times to retry an agent before failing to the next agent. + * @param name The name of the Appender. + * @param suppress If true exceptions will be handled in the appender. + * @param excludes A comma separated list of MDC elements to exclude. + * @param includes A comma separated list of MDC elements to include. + * @param required A comma separated list of MDC elements that are required. + * @param mdcPrefix The prefix to add to MDC key names. + * @param eventPrefix The prefix to add to event key names. + * @param compressBody If true the event body will be compressed. + * @param batchSize Number of events to include in a batch. Defaults to 1. + * @param factory The factory to use to create Flume events. + * @param layout The layout to format the event. + * @param filter A Filter to filter events. + * @return A Flume Avro Appender. + */ + @PluginFactory + public static FlumeAvroAppender createAppender(@PluginElement("agents") Agent[] agents, + @PluginAttr("reconnectionDelay") String delay, + @PluginAttr("agentRetries") String agentRetries, + @PluginAttr("name") String name, + @PluginAttr("suppressExceptions") String suppress, + @PluginAttr("mdcExcludes") String excludes, + @PluginAttr("mdcIncludes") String includes, + @PluginAttr("mdcRequired") String required, + @PluginAttr("mdcPrefix") String mdcPrefix, + @PluginAttr("eventPrefix") String eventPrefix, + @PluginAttr("compress") String compressBody, + @PluginAttr("batchSize") String batchSize, + @PluginElement("flumeEventFactory") FlumeEventFactory factory, + @PluginElement("layout") Layout layout, + @PluginElement("filters") Filter filter) { + + String hostname; + try { + hostname = InetAddress.getLocalHost().getHostName(); + } catch (Exception ex) { + LOGGER.error("Unable to determine local hostname", ex); + return null; + } + if (agents == null || agents.length == 0) { + LOGGER.debug("No agents provided, using defaults"); + agents = new Agent[] {Agent.createAgent(null, null)}; + } + + boolean handleExceptions = suppress == null ? true : Boolean.valueOf(suppress); + boolean compress = compressBody == null ? true : Boolean.valueOf(compressBody); + + int batchCount = batchSize == null ? 1 : Integer.parseInt(batchSize); + int reconnectDelay = delay == null ? 0 : Integer.parseInt(delay); + int retries = agentRetries == null ? 0 : Integer.parseInt(agentRetries); + + if (layout == null) { + layout = RFC5424Layout.createLayout(null, null, null, "True", null, null, null, null, excludes, + includes, required, null, null); + } + + if (name == null) { + LOGGER.error("No name provided for Appender"); + return null; + } + + FlumeAvroManager manager = FlumeAvroManager.getManager(agents, batchCount); + if (manager == null) { + return null; + } + + return new FlumeAvroAppender(name, filter, layout, handleExceptions, hostname, includes, + excludes, required, mdcPrefix, eventPrefix, compress, reconnectDelay, retries, factory, manager); + } +} Added: logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers/log4j2-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeAvroManager.java URL: http://svn.apache.org/viewvc/logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers/log4j2-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeAvroManager.java?rev=1226514&view=auto ============================================================================== --- logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers/log4j2-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeAvroManager.java (added) +++ logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers/log4j2-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeAvroManager.java Mon Jan 2 18:52:50 2012 @@ -0,0 +1,315 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache license, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the license for the specific language governing permissions and + * limitations under the license. + */ +package org.apache.logging.log4j.flume.appender; + +import org.apache.avro.AvroRemoteException; +import org.apache.avro.ipc.NettyTransceiver; +import org.apache.avro.ipc.Transceiver; +import org.apache.avro.ipc.specific.SpecificRequestor; +import org.apache.flume.source.avro.AvroFlumeEvent; +import org.apache.flume.source.avro.AvroSourceProtocol; +import org.apache.flume.source.avro.Status; +import org.apache.logging.log4j.core.appender.AbstractManager; +import org.apache.logging.log4j.core.appender.AppenderRuntimeException; +import org.apache.logging.log4j.core.appender.ManagerFactory; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Manager for FlumeAvroAppenders. + */ +public class FlumeAvroManager extends AbstractManager { + + /** + The default reconnection delay (500 milliseconds or .5 seconds). + */ + public static final int DEFAULT_RECONNECTION_DELAY = 500; + + private static final int DEFAULT_RECONNECTS = 3; + + private static ManagerFactory factory = new AvroManagerFactory(); + + private AvroSourceProtocol client; + + private final Agent[] agents; + + private final int batchSize; + + private final EventList events = new EventList(); + + private int current = 0; + + private Transceiver transceiver; + + /** + * Constructor + * @param name The unique name of this manager. + * @param agents An array of Agents. + * @param batchSize The number of evetns to include in a batch. + */ + protected FlumeAvroManager(String name, Agent[] agents, int batchSize) { + super(name); + this.agents = agents; + this.batchSize = batchSize; + this.client = connect(agents); + } + + /** + * Return a FlumeAvroManager. + * @param agents The agents to use. + * @param batchSize The number of events to include in a batch. + * @return A FlumeAvroManager. + */ + public static FlumeAvroManager getManager(Agent[] agents, int batchSize) { + if (agents == null || agents.length == 0) { + throw new IllegalArgumentException("At least one agent is required"); + } + + if (batchSize <= 0) { + batchSize = 1; + } + + StringBuilder sb = new StringBuilder("FlumeAvro["); + boolean first = true; + for (Agent agent : agents) { + if (!first) { + sb.append(","); + } + sb.append(agent.getHost()).append(":").append(agent.getPort()); + first = false; + } + sb.append("]"); + return (FlumeAvroManager) getManager(sb.toString(), factory, new FactoryData(agents, batchSize)); + } + + /** + * Return the agents. + * @return The agent array. + */ + public Agent[] getAgents() { + return agents; + } + + /** + * Returns the index of the current agent. + * @return The index for the current agent. + */ + public int getCurrent() { + return current; + } + + protected synchronized void send(FlumeEvent event, int delay, int retries) { + if (delay == 0) { + delay = DEFAULT_RECONNECTION_DELAY; + } + if (retries == 0) { + retries = DEFAULT_RECONNECTS; + } + AvroFlumeEvent avroEvent = new AvroFlumeEvent(); + avroEvent.body = ByteBuffer.wrap(event.getBody()); + avroEvent.headers = new HashMap(); + + for (Map.Entry entry : event.getHeaders().entrySet()) { + avroEvent.headers.put(entry.getKey(), entry.getValue()); + } + + List batch = batchSize > 1 ? events.addAndGet(avroEvent, batchSize) : null; + if (batch == null && batchSize > 1) { + return; + } + + int i = 0; + + String msg = "Error writing to " + getName(); + + do { + try { + Status status = (batch == null) ? client.append(avroEvent) : client.appendBatch(batch); + if (!status.equals(Status.OK)) { + throw new AvroRemoteException("RPC communication failed to " + agents[current].getHost() + + ":" + agents[current].getPort()); + } + return; + } catch (Exception ex) { + if (i == retries - 1) { + msg = "Error writing to " + getName() + " at " + agents[current].getHost() + ":" + + agents[current].getPort(); + LOGGER.warn(msg, ex); + break; + } + sleep(delay); + } + } while (++i < retries); + + for (int index = 0; index < agents.length; ++index) { + if (index == current) { + continue; + } + Agent agent = agents[index]; + i = 0; + do { + try { + transceiver = null; + AvroSourceProtocol c = connect(agent.getHost(), agent.getPort()); + Status status = (batch == null) ? c.append(avroEvent) : c.appendBatch(batch); + if (!status.equals(Status.OK)) { + if (i == retries - 1) { + String warnMsg = "RPC communication failed to " + getName() + " at " + + agent.getHost() + ":" + agent.getPort(); + LOGGER.warn(warnMsg); + } + continue; + } + client = c; + current = i; + return; + } catch (Exception ex) { + if (i == retries - 1) { + String warnMsg = "Error writing to " + getName() + " at " + agent.getHost() + ":" + + agent.getPort(); + LOGGER.warn(warnMsg, ex); + break; + } + sleep(delay); + } + } while (++i < retries); + } + + throw new AppenderRuntimeException(msg); + + } + + private void sleep(int delay) { + try { + Thread.sleep(delay); + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + } + } + + /** + * There is a very good chance that this will always return the first agent even if it isn't available. + * @param agents The list of agents to choose from + * @return The FlumeEventAvroServer. + */ + private AvroSourceProtocol connect(Agent[] agents) { + int i = 0; + for (Agent agent : agents) { + AvroSourceProtocol server = connect(agent.getHost(), agent.getPort()); + if (server != null) { + current = i; + return server; + } + ++i; + } + throw new AppenderRuntimeException("Unable to connect to any agents"); + } + + private AvroSourceProtocol connect(String hostname, int port) { + try { + if (transceiver == null) { + transceiver = new NettyTransceiver(new InetSocketAddress(hostname, port)); + } + } catch (IOException ioe) { + LOGGER.error("Unable to create transceiver", ioe); + return null; + } + try { + return SpecificRequestor.getClient(AvroSourceProtocol.class, transceiver); + } catch (IOException ioe) { + LOGGER.error("Unable to create Avro client"); + return null; + } + } + + @Override + protected void releaseSub() { + if (transceiver != null) { + try { + transceiver.close(); + } catch (IOException ioe) { + LOGGER.error("Attempt to clean up Avro transceiver failed", ioe); + } + } + client = null; + } + + /** + * Thread-safe List management of a batch. + */ + private static class EventList extends ArrayList { + + public synchronized List addAndGet(AvroFlumeEvent event, int batchSize) { + super.add(event); + if (this.size() >= batchSize) { + List events = new ArrayList(); + events.addAll(this); + clear(); + return events; + } else { + return null; + } + } + } + + /** + * Factory data. + */ + private static class FactoryData { + private Agent[] agents; + private int batchSize; + + /** + * Constructor. + * @param agents The agents. + * @param batchSize The number of events to include in a batch. + */ + public FactoryData(Agent[] agents, int batchSize) { + this.agents = agents; + this.batchSize = batchSize; + } + } + + /** + * Avro Manager Factory. + */ + private static class AvroManagerFactory implements ManagerFactory { + + /** + * Create the FlumeAvroManager. + * @param name The name of the entity to manage. + * @param data The data required to create the entity. + * @return The FlumeAvroManager. + */ + public FlumeAvroManager createManager(String name, FactoryData data) { + try { + + return new FlumeAvroManager(name, data.agents, data.batchSize); + } catch (Exception ex) { + LOGGER.error("Could not create FlumeAvroManager", ex); + } + return null; + } + } + +} Added: logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers/log4j2-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeEvent.java URL: http://svn.apache.org/viewvc/logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers/log4j2-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeEvent.java?rev=1226514&view=auto ============================================================================== --- logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers/log4j2-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeEvent.java (added) +++ logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers/log4j2-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeEvent.java Mon Jan 2 18:52:50 2012 @@ -0,0 +1,260 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache license, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the license for the specific language governing permissions and + * limitations under the license. + */ +package org.apache.logging.log4j.flume.appender; + +import org.apache.flume.event.SimpleEvent; +import org.apache.logging.log4j.Level; +import org.apache.logging.log4j.LoggingException; +import org.apache.logging.log4j.Marker; +import org.apache.logging.log4j.core.LogEvent; +import org.apache.logging.log4j.core.helpers.UUIDUtil; +import org.apache.logging.log4j.message.MapMessage; +import org.apache.logging.log4j.message.Message; +import org.apache.logging.log4j.message.StructuredDataId; +import org.apache.logging.log4j.message.StructuredDataMessage; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Stack; +import java.util.zip.GZIPOutputStream; + +/** + * Class that is both a Flume and Log4j Event. + */ +public class FlumeEvent extends SimpleEvent implements LogEvent { + + private static final String DEFAULT_MDC_PREFIX = "mdc:"; + + private static final String DEFAULT_EVENT_PREFIX = ""; + + private static final String EVENT_TYPE = "eventType"; + + private static final String EVENT_ID = "eventId"; + + private static final String GUID = "guId"; + + private final LogEvent event; + + private final Map ctx = new HashMap(); + + private final boolean compress; + + /** + * Construct the FlumeEvent. + * @param event The Log4j LogEvent. + * @param includes A comma separated list of MDC elements to include. + * @param excludes A comma separated list of MDC elements to exclude. + * @param required A comma separated list of MDC elements that are required to be defined. + * @param mdcPrefix The value to prefix to MDC keys. + * @param eventPrefix The value to prefix to event keys. + * @param compress If true the event body should be compressed. + */ + public FlumeEvent(LogEvent event, String includes, String excludes, String required, + String mdcPrefix, String eventPrefix, boolean compress) { + this.event = event; + this.compress = compress; + Map headers = getHeaders(); + if (mdcPrefix == null) { + mdcPrefix = DEFAULT_MDC_PREFIX; + } + if (eventPrefix == null) { + eventPrefix = DEFAULT_EVENT_PREFIX; + } + Map mdc = event.getContextMap(); + if (includes != null) { + String[] array = includes.split(","); + if (array.length > 0) { + for (String str : array) { + if (mdc.containsKey(str)) { + ctx.put(str, mdc.get(str)); + } + } + } + } else if (excludes != null) { + String[] array = excludes.split(","); + if (array.length > 0) { + List list = Arrays.asList(array); + for (Map.Entry entry : mdc.entrySet()) { + if (!list.contains(entry.getKey())) { + ctx.put(entry.getKey(), entry.getValue()); + } + } + } + } + + if (required != null) { + String[] array = required.split(","); + if (array.length > 0) { + for (String str : array) { + if (!mdc.containsKey(str)) { + throw new LoggingException("Required key " + str + " is missing from the MDC"); + } + } + } + } + Message message = event.getMessage(); + if (message instanceof MapMessage) { + if (message instanceof StructuredDataMessage) { + addStructuredData(eventPrefix, headers, (StructuredDataMessage) message); + } + addMapData(eventPrefix, headers, (MapMessage) message); + } + + addContextData(mdcPrefix, headers, ctx); + + addGuid(headers); + } + + protected void addStructuredData(String prefix, Map fields, StructuredDataMessage msg) { + fields.put(prefix + EVENT_TYPE, msg.getType()); + StructuredDataId id = msg.getId(); + fields.put(prefix + EVENT_ID, id.getName()); + } + + protected void addMapData(String prefix, Map fields, MapMessage msg) { + Map data = msg.getData(); + for (Map.Entry entry : data.entrySet()) { + fields.put(prefix + entry.getKey(), entry.getValue()); + } + } + + protected void addContextData(String prefix, Map fields, Map context) { + for (Map.Entry entry : context.entrySet()) { + fields.put(prefix + entry.getKey(), entry.getValue()); + } + } + + protected void addGuid(Map fields) { + fields.put(GUID, UUIDUtil.getTimeBasedUUID().toString()); + } + + /** + * Set the body in the event. + * @param body The body to add to the event. + */ + public void setBody(byte[] body) { + if (body == null || body.length == 0) { + super.setBody(new byte[0]); + return; + } + if (compress) { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + try { + GZIPOutputStream os = new GZIPOutputStream(baos); + os.write(body); + os.close(); + } catch (IOException ioe) { + throw new LoggingException("Unable to compress message", ioe); + } + super.setBody(baos.toByteArray()); + } else { + super.setBody(body); + } + } + + /** + * Get the Frequently Qualified Class Name. + * @return the FQCN String. + */ + public String getFQCN() { + return event.getFQCN(); + } + + /** + * Return the logging Level. + * @return the Level. + */ + public Level getLevel() { + return event.getLevel(); + } + + /** + * Return the logger name. + * @return the logger name. + */ + public String getLoggerName() { + return event.getLoggerName(); + } + + /** + * Return the StackTraceElement for the caller of the logging API. + * @return the StackTraceElement of the caller. + */ + public StackTraceElement getSource() { + return event.getSource(); + } + + /** + * Return the Message. + * @return the Message. + */ + public Message getMessage() { + return event.getMessage(); + } + + /** + * Return the Marker. + * @return the Marker. + */ + public Marker getMarker() { + return event.getMarker(); + } + + /** + * Return the name of the Thread. + * @return the name of the Thread. + */ + public String getThreadName() { + return event.getThreadName(); + } + + /** + * Return the event timestamp. + * @return the event timestamp. + */ + public long getMillis() { + return event.getMillis(); + } + + /** + * Return the Throwable associated with the event, if any. + * @return the Throwable. + */ + public Throwable getThrown() { + return event.getThrown(); + } + + /** + * Return a copy of the context Map. + * @return a copy of the context Map. + */ + public Map getContextMap() { + return ctx; + } + + /** + * Return a copy of the context stack. + * @return a copy of the context stack. + */ + public Stack getContextStack() { + return event.getContextStack(); + } +} Added: logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers/log4j2-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeEventFactory.java URL: http://svn.apache.org/viewvc/logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers/log4j2-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeEventFactory.java?rev=1226514&view=auto ============================================================================== --- logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers/log4j2-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeEventFactory.java (added) +++ logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers/log4j2-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeEventFactory.java Mon Jan 2 18:52:50 2012 @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache license, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the license for the specific language governing permissions and + * limitations under the license. + */ +package org.apache.logging.log4j.flume.appender; + +import org.apache.logging.log4j.core.LogEvent; + +/** + * Factory to create Flume events. + */ +public interface FlumeEventFactory { + /** + * Create a Flume event. + * @param event The Log4j LogEvent. + * @param includes A comma separated list of MDC elements to include. + * @param excludes A comma separated list of MDC elements to exclude. + * @param required A comma separated list of MDC elements that are required. + * @param mdcPrefix The value to prefix to MDC keys. + * @param eventPrefix The value to prefix to event keys. + * @param compress If true the event body should be compressed. + * @return A FlumeEvent. + */ + FlumeEvent createEvent(LogEvent event, String includes, String excludes, String required, + String mdcPrefix, String eventPrefix, boolean compress); +} Added: logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers/log4j2-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/package-info.java URL: http://svn.apache.org/viewvc/logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers/log4j2-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/package-info.java?rev=1226514&view=auto ============================================================================== --- logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers/log4j2-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/package-info.java (added) +++ logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers/log4j2-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/package-info.java Mon Jan 2 18:52:50 2012 @@ -0,0 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache license, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the license for the specific language governing permissions and + * limitations under the license. + */ +/** + * Apache Flume Appender. Requires the user specifically include Flume and its dependencies. + */ +package org.apache.logging.log4j.flume.appender; Added: logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers/log4j2-flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumeAvroAppenderTest.java URL: http://svn.apache.org/viewvc/logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers/log4j2-flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumeAvroAppenderTest.java?rev=1226514&view=auto ============================================================================== --- logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers/log4j2-flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumeAvroAppenderTest.java (added) +++ logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers/log4j2-flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumeAvroAppenderTest.java Mon Jan 2 18:52:50 2012 @@ -0,0 +1,311 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache license, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the license for the specific language governing permissions and + * limitations under the license. + */ +package org.apache.logging.log4j.flume.appender; + +import org.apache.flume.Channel; +import org.apache.flume.ChannelException; +import org.apache.flume.Context; +import org.apache.flume.Event; + +import org.apache.flume.Transaction; +import org.apache.flume.channel.MemoryChannel; +import org.apache.flume.conf.Configurables; +import org.apache.flume.lifecycle.LifecycleController; +import org.apache.flume.lifecycle.LifecycleState; +import org.apache.flume.source.AvroSource; +import org.apache.logging.log4j.Level; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.core.Appender; +import org.apache.logging.log4j.core.Logger; +import org.apache.logging.log4j.core.LoggerContext; +import org.apache.logging.log4j.core.config.plugins.PluginManager; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.Map; +import java.util.zip.GZIPInputStream; + +/** + * + */ +public class FlumeAvroAppenderTest { + + private static LoggerContext ctx; + + private static final int testServerPort = 12345; + + private AvroSource eventSource; + private Channel channel; + private Logger avroLogger; + + private String testPort; + + @BeforeClass + public static void setupClass() { + PluginManager.addPackage("org.apache.logging.log4j.flume"); + ctx = (LoggerContext) LogManager.getContext(); + } + + @AfterClass + public static void cleanupClass() { + + } + + @Before + public void setUp() throws Exception { + eventSource = new AvroSource(); + channel = new MemoryChannel(); + + Configurables.configure(channel, new Context()); + + eventSource.setChannel(channel); + + avroLogger = (Logger) LogManager.getLogger("avrologger"); + /* + * Clear out all other appenders associated with this logger to ensure we're + * only hitting the Avro appender. + */ + removeAppenders(avroLogger); + boolean bound = false; + + for (int i = 0; i < 100 && !bound; i++) { + try { + Context context = new Context(); + testPort = String.valueOf(testServerPort + i); + context.put("port", testPort); + context.put("bind", "0.0.0.0"); + + Configurables.configure(eventSource, context); + + eventSource.start(); + bound = true; + } catch (ChannelException e) { + + } + } + Assert.assertTrue("Reached start or error", LifecycleController.waitForOneOf( + eventSource, LifecycleState.START_OR_ERROR)); + Assert.assertEquals("Server is started", LifecycleState.START, eventSource.getLifecycleState()); + } + + @After + public void teardown() throws Exception { + removeAppenders(avroLogger); + eventSource.stop(); + Assert.assertTrue("Reached stop or error", + LifecycleController.waitForOneOf(eventSource, LifecycleState.STOP_OR_ERROR)); + Assert.assertEquals("Server is stopped", LifecycleState.STOP, + eventSource.getLifecycleState()); + } + + @Test + public void testLog4jAvroAppender() throws InterruptedException, IOException { + Agent[] agents = new Agent[] {Agent.createAgent("localhost", testPort)}; + FlumeAvroAppender avroAppender = FlumeAvroAppender.createAppender(agents, "100", "3", "avro", "false", null, + null, null, null, null, "true", "1", null, null, null); + avroAppender.start(); + avroLogger.addAppender(avroAppender); + avroLogger.setLevel(Level.ALL); + + Assert.assertNotNull(avroLogger); + + avroLogger.info("Test message"); + + Transaction transaction = channel.getTransaction(); + transaction.begin(); + + Event event = channel.take(); + Assert.assertNotNull(event); + Assert.assertTrue("Channel contained event, but not expected message", + getBody(event).endsWith("Test message")); + transaction.commit(); + transaction.close(); + + eventSource.stop(); + } + + + @Test + public void testMultiple() throws InterruptedException, IOException { + Agent[] agents = new Agent[] {Agent.createAgent("localhost", testPort)}; + FlumeAvroAppender avroAppender = FlumeAvroAppender.createAppender(agents, "100", "3", "avro", "false", null, + null, null, null, null, "true", "1", null, null, null); + avroAppender.start(); + avroLogger.addAppender(avroAppender); + avroLogger.setLevel(Level.ALL); + + Assert.assertNotNull(avroLogger); + + for (int i = 0; i < 10; ++i) { + avroLogger.info("Test message " + i); + } + + for (int i = 0; i < 10; ++i) { + Transaction transaction = channel.getTransaction(); + transaction.begin(); + + Event event = channel.take(); + Assert.assertNotNull(event); + Assert.assertTrue("Channel contained event, but not expected message", + getBody(event).endsWith("Test message " + i)); + transaction.commit(); + transaction.close(); + } + + eventSource.stop(); + } + + @Test + public void testBatch() throws InterruptedException, IOException { + Agent[] agents = new Agent[] {Agent.createAgent("localhost", testPort)}; + FlumeAvroAppender avroAppender = FlumeAvroAppender.createAppender(agents, "100", "3", "avro", "false", null, + null, null, null, null, "true", "10", null, null, null); + avroAppender.start(); + avroLogger.addAppender(avroAppender); + avroLogger.setLevel(Level.ALL); + + Assert.assertNotNull(avroLogger); + + for (int i = 0; i < 10; ++i) { + avroLogger.info("Test message " + i); + } + + Transaction transaction = channel.getTransaction(); + transaction.begin(); + + for (int i = 0; i < 10; ++i) { + Event event = channel.take(); + Assert.assertNotNull("No event for item " + i, event); + Assert.assertTrue("Channel contained event, but not expected message", + getBody(event).endsWith("Test message " + i)); + } + transaction.commit(); + transaction.close(); + + eventSource.stop(); + } + + + @Test + public void testConnectionRefused() { + Agent[] agents = new Agent[] {Agent.createAgent("localhost", testPort)}; + FlumeAvroAppender avroAppender = FlumeAvroAppender.createAppender(agents, "100", "3", "avro", "false", null, + null, null, null, null, "true", "1", null, null, null); + avroAppender.start(); + avroLogger.addAppender(avroAppender); + avroLogger.setLevel(Level.ALL); + eventSource.stop(); + + boolean caughtException = false; + + try { + avroLogger.info("message 1"); + } catch (Throwable t) { + //logger.debug("Logging to a non-existant server failed (as expected)", t); + + caughtException = true; + } + + Assert.assertTrue(caughtException); + } + + + + @Test + public void testReconnect() throws Exception { + String altPort = Integer.toString(Integer.parseInt(testPort) + 1); + Agent[] agents = new Agent[] {Agent.createAgent("localhost", testPort), + Agent.createAgent("localhost", altPort)}; + FlumeAvroAppender avroAppender = FlumeAvroAppender.createAppender(agents, "100", "3", "avro", "false", null, + null, null, null, null, "true", "1", null, null, null); + avroAppender.start(); + avroLogger.addAppender(avroAppender); + avroLogger.setLevel(Level.ALL); + + avroLogger.info("Test message"); + + Transaction transaction = channel.getTransaction(); + transaction.begin(); + + Event event = channel.take(); + Assert.assertNotNull(event); + Assert.assertTrue("Channel contained event, but not expected message", + getBody(event).endsWith("Test message")); + transaction.commit(); + transaction.close(); + + eventSource.stop(); + try { + Context context = new Context(); + context.put("port", altPort); + context.put("bind", "0.0.0.0"); + + Configurables.configure(eventSource, context); + + eventSource.start(); + } catch (ChannelException e) { + Assert.fail("Caught exception while resetting port to " + altPort + " : " + e.getMessage()); + } + + avroLogger.info("Test message 2"); + + transaction = channel.getTransaction(); + transaction.begin(); + + event = channel.take(); + Assert.assertNotNull(event); + Assert.assertTrue("Channel contained event, but not expected message", + getBody(event).endsWith("Test message 2")); + transaction.commit(); + transaction.close(); + } + + + + private void removeAppenders(Logger logger) { + Map map = logger.getAppenders(); + for (Map.Entry entry : map.entrySet()) { + Appender app = entry.getValue(); + avroLogger.removeAppender(app); + app.stop(); + } + } + + private Appender getAppender(Logger logger, String name) { + Map map = logger.getAppenders(); + return map.get(name); + } + + private String getBody(Event event) throws IOException { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + InputStream is = new GZIPInputStream(new ByteArrayInputStream(event.getBody())); + int n = 0; + while (-1 != (n = is.read())) { + baos.write(n); + } + return new String(baos.toByteArray()); + + } +} Propchange: logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers/log4j2-flume-og/ ------------------------------------------------------------------------------ --- svn:ignore (added) +++ svn:ignore Mon Jan 2 18:52:50 2012 @@ -0,0 +1,2 @@ +target +*.iml Added: logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers/log4j2-flume-og/pom.xml URL: http://svn.apache.org/viewvc/logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers/log4j2-flume-og/pom.xml?rev=1226514&view=auto ============================================================================== --- logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers/log4j2-flume-og/pom.xml (added) +++ logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers/log4j2-flume-og/pom.xml Mon Jan 2 18:52:50 2012 @@ -0,0 +1,210 @@ + + + 4.0.0 + + org.apache.logging.rgoers + log4j2 + 1.99.0-SNAPSHOT + + org.apache.logging.rgoers + log4j2-flume-og + jar + Log4J2 Flume OG + Log4j 2.0 Flume (Original) Appender + + ${basedir}/.. + + + + org.apache.logging.rgoers + log4j2-api + + + org.apache.logging.rgoers + log4j2-core + + + org.apache.logging.rgoers + slf4j-impl + + + junit + junit + 4.7 + test + + + com.cloudera + flume-core + 0.9.4-cdh3u1 + true + + + org.slf4j + slf4j-log4j12 + + + + + + + + org.apache.maven.plugins + maven-jar-plugin + + + + test-jar + + + + + + org.apache.maven.plugins + maven-surefire-plugin + + always + + + + + + + + org.apache.maven.plugins + maven-changes-plugin + 2.6 + + + + changes-report + + + + + %URL%/show_bug.cgi?id=%ISSUE% + + + + org.apache.maven.plugins + maven-checkstyle-plugin + 2.7 + + + ${log4j.parent.dir}/checkstyle.xml + ${log4j.parent.dir}/checkstyle-suppressions.xml + false + basedir=${basedir} + licensedir=${log4j.parent.dir}/checkstyle-header.txt + + + + org.apache.maven.plugins + maven-javadoc-plugin + 2.8 + + + false + true + + + issue + a + JIRA issue: + + + doubt + a + Troublesome: + + + compare + a + Compare with: + + + + + + non-aggregate + + javadoc + + + + + + org.codehaus.mojo + findbugs-maven-plugin + 2.3.2 + + Normal + Default + findbugs-exclude-filter.xml + + + + org.apache.maven.plugins + maven-jxr-plugin + 2.3 + + + non-aggregate + + jxr + + + + aggregate + + aggregate + + + + + + org.apache.maven.plugins + maven-pmd-plugin + + 1.5 + + + + org.codehaus.mojo + cobertura-maven-plugin + 2.2 + + + + + + + + + + + + cloudera + https://repository.cloudera.com/content/repositories/releases/ + + + + Added: logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers/log4j2-flume-og/src/main/java/org/apache/logging/log4j/flumeog/appender/Agent.java URL: http://svn.apache.org/viewvc/logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers/log4j2-flume-og/src/main/java/org/apache/logging/log4j/flumeog/appender/Agent.java?rev=1226514&view=auto ============================================================================== --- logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers/log4j2-flume-og/src/main/java/org/apache/logging/log4j/flumeog/appender/Agent.java (added) +++ logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers/log4j2-flume-og/src/main/java/org/apache/logging/log4j/flumeog/appender/Agent.java Mon Jan 2 18:52:50 2012 @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache license, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the license for the specific language governing permissions and + * limitations under the license. + */ +package org.apache.logging.log4j.flumeog.appender; + +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.core.config.plugins.Plugin; +import org.apache.logging.log4j.core.config.plugins.PluginAttr; +import org.apache.logging.log4j.core.config.plugins.PluginFactory; +import org.apache.logging.log4j.status.StatusLogger; + +/** + * Agent Specification for FlumeAvroAppender. + */ +@Plugin(name = "Agent", type = "Core", printObject = true) +public final class Agent { + + private static final String DEFAULT_HOST = "localhost"; + + private static final int DEFAULT_PORT = 35853; + + private static final Logger LOGGER = StatusLogger.getLogger(); + + private final String host; + + private final int port; + + private Agent(String host, int port) { + this.host = host; + this.port = port; + } + + /** + * Retrieve the host name. + * @return The name of the host. + */ + public String getHost() { + return host; + } + + /** + * Retrieve the port number. + * @return The port number. + */ + public int getPort() { + return port; + } + + @Override + public String toString() { + return "host=" + host + " port=" + port; + } + + /** + * Create an Agent. + * @param host The host name. + * @param port The port number. + * @return The Agent. + */ + @PluginFactory + public static Agent createAgent(@PluginAttr("host") String host, + @PluginAttr("port") String port) { + if (host == null) { + host = DEFAULT_HOST; + } + + int portNum; + if (port != null) { + try { + portNum = Integer.parseInt(port); + } catch (Exception ex) { + LOGGER.error("Error parsing port number " + port, ex); + return null; + } + } else { + portNum = DEFAULT_PORT; + } + return new Agent(host, portNum); + } +} Added: logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers/log4j2-flume-og/src/main/java/org/apache/logging/log4j/flumeog/appender/FlumeAvroAppender.java URL: http://svn.apache.org/viewvc/logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers/log4j2-flume-og/src/main/java/org/apache/logging/log4j/flumeog/appender/FlumeAvroAppender.java?rev=1226514&view=auto ============================================================================== --- logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers/log4j2-flume-og/src/main/java/org/apache/logging/log4j/flumeog/appender/FlumeAvroAppender.java (added) +++ logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers/log4j2-flume-og/src/main/java/org/apache/logging/log4j/flumeog/appender/FlumeAvroAppender.java Mon Jan 2 18:52:50 2012 @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache license, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the license for the specific language governing permissions and + * limitations under the license. + */ +package org.apache.logging.log4j.flumeog.appender; + +import org.apache.logging.log4j.core.Filter; +import org.apache.logging.log4j.core.Layout; +import org.apache.logging.log4j.core.LogEvent; +import org.apache.logging.log4j.core.appender.AppenderBase; +import org.apache.logging.log4j.core.config.plugins.Plugin; +import org.apache.logging.log4j.core.config.plugins.PluginAttr; +import org.apache.logging.log4j.core.config.plugins.PluginElement; +import org.apache.logging.log4j.core.config.plugins.PluginFactory; +import org.apache.logging.log4j.core.layout.RFC5424Layout; + +import java.net.InetAddress; + +/** + * An Appender that uses the Avro protocol to route events to Flume. + */ +@Plugin(name = "Flume", type = "Core", elementType = "appender", printObject = true) +public final class FlumeAvroAppender extends AppenderBase implements FlumeEventFactory { + + private FlumeAvroManager manager; + + private final String mdcIncludes; + private final String mdcExcludes; + private final String mdcRequired; + + private final String eventPrefix; + + private final String mdcPrefix; + + private final boolean compressBody; + + private final String hostname; + + private final int reconnectDelay; + + private final int retries; + + private final FlumeEventFactory factory; + + private FlumeAvroAppender(String name, Filter filter, Layout layout, boolean handleException, + String hostname, String includes, String excludes, String required, String mdcPrefix, + String eventPrefix, boolean compress, int delay, int retries, + FlumeEventFactory factory, FlumeAvroManager manager) { + super(name, filter, layout, handleException); + this.manager = manager; + this.mdcIncludes = includes; + this.mdcExcludes = excludes; + this.mdcRequired = required; + this.eventPrefix = eventPrefix; + this.mdcPrefix = mdcPrefix; + this.compressBody = compress; + this.hostname = hostname; + this.reconnectDelay = delay; + this.retries = retries; + this.factory = factory == null ? this : factory; + } + + /** + * Publish the event. + * @param event The LogEvent. + */ + public void append(LogEvent event) { + + FlumeEvent flumeEvent = factory.createEvent(event, hostname, mdcIncludes, mdcExcludes, mdcRequired, mdcPrefix, + eventPrefix, compressBody); + flumeEvent.setBody(getLayout().format(flumeEvent)); + manager.send(flumeEvent, reconnectDelay, retries); + } + + @Override + public void stop() { + super.stop(); + manager.release(); + } + + /** + * Create a Flume event. + * @param event The Log4j LogEvent. + * @param hostname The host name. + * @param includes comma separated list of mdc elements to include. + * @param excludes comma separated list of mdc elements to exclude. + * @param required comma separated list of mdc elements that must be present with a value. + * @param mdcPrefix The prefix to add to MDC key names. + * @param eventPrefix The prefix to add to event fields. + * @param compress If true the body will be compressed. + * @return A Flume Event. + */ + public FlumeEvent createEvent(LogEvent event, String hostname, String includes, String excludes, String required, + String mdcPrefix, String eventPrefix, boolean compress) { + return new FlumeEvent(event, hostname, mdcIncludes, mdcExcludes, mdcRequired, mdcPrefix, + eventPrefix, compressBody); + } + + /** + * Create a Flume Avro Appender. + * @param agents An array of Agents. + * @param delay The amount of time in milliseconds to wait between retries. + * @param agentRetries The number of times to retry an agent before failing to the next agent. + * @param name The name of the Appender. + * @param suppress If true exceptions will be handled in the appender. + * @param excludes A comma separated list of MDC elements to exclude. + * @param includes A comma separated list of MDC elements to include. + * @param required A comma separated list of MDC elements that are required. + * @param mdcPrefix The prefix to add to MDC key names. + * @param eventPrefix The prefix to add to event key names. + * @param compressBody If true the event body will be compressed. + * @param factory The factory to use to create Flume events. + * @param layout The layout to format the event. + * @param filter A Filter to filter events. + * @return A Flume Avro Appender. + */ + @PluginFactory + public static FlumeAvroAppender createAppender(@PluginElement("agents") Agent[] agents, + @PluginAttr("reconnectionDelay") String delay, + @PluginAttr("agentRetries") String agentRetries, + @PluginAttr("name") String name, + @PluginAttr("suppressExceptions") String suppress, + @PluginAttr("mdcExcludes") String excludes, + @PluginAttr("mdcIncludes") String includes, + @PluginAttr("mdcRequired") String required, + @PluginAttr("mdcPrefix") String mdcPrefix, + @PluginAttr("eventPrefix") String eventPrefix, + @PluginAttr("compress") String compressBody, + @PluginElement("flumeEventFactory") FlumeEventFactory factory, + @PluginElement("layout") Layout layout, + @PluginElement("filters") Filter filter) { + + String hostname; + try { + hostname = InetAddress.getLocalHost().getHostName(); + } catch (Exception ex) { + LOGGER.error("Unable to determine local hostname", ex); + return null; + } + if (agents == null || agents.length == 0) { + LOGGER.debug("No agents provided, using defaults"); + agents = new Agent[] {Agent.createAgent(null, null)}; + } + + boolean handleExceptions = suppress == null ? true : Boolean.valueOf(suppress); + boolean compress = compressBody == null ? true : Boolean.valueOf(compressBody); + + int reconnectDelay = delay == null ? 0 : Integer.parseInt(delay); + int retries = agentRetries == null ? 0 : Integer.parseInt(agentRetries); + + if (layout == null) { + layout = RFC5424Layout.createLayout(null, null, null, null, "True", null, null, null, null, excludes, + includes, required, null); + } + + if (name == null) { + LOGGER.error("No name provided for Appender"); + return null; + } + + FlumeAvroManager manager = FlumeAvroManager.getManager(agents); + if (manager == null) { + return null; + } + + return new FlumeAvroAppender(name, filter, layout, handleExceptions, hostname, includes, + excludes, required, mdcPrefix, eventPrefix, compress, reconnectDelay, retries, factory, manager); + } +}