eagle-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From h..@apache.org
Subject eagle git commit: [MINOR] Add kafka log4j appender integration test cases
Date Thu, 23 Feb 2017 03:06:35 GMT
Repository: eagle
Updated Branches:
  refs/heads/master 167ec0815 -> 051dc69c5


[MINOR] Add kafka log4j appender integration test cases

* Add kafka log4j appender integration test cases `KafkaLog4jAppenderIT`

Author: Hao Chen <hao@apache.org>

Closes #838 from haoch/AddKafkaLog4jAppenderIT.


Project: http://git-wip-us.apache.org/repos/asf/eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/eagle/commit/051dc69c
Tree: http://git-wip-us.apache.org/repos/asf/eagle/tree/051dc69c
Diff: http://git-wip-us.apache.org/repos/asf/eagle/diff/051dc69c

Branch: refs/heads/master
Commit: 051dc69c5cc50bc354e72a4951472ae2c9a25524
Parents: 167ec08
Author: Hao Chen <hao@apache.org>
Authored: Thu Feb 23 11:06:23 2017 +0800
Committer: Hao Chen <hao@apache.org>
Committed: Thu Feb 23 11:06:23 2017 +0800

----------------------------------------------------------------------
 eagle-external/eagle-log4jkafka/pom.xml         | 15 ++++
 .../src/test/resources/log4j.properties         |  6 +-
 .../log4j/kafka/KafkaLog4jAppenderIT.scala      | 80 ++++++++++++++++++++
 .../eagle/log4j/kafka/KafkaTestBase.scala       | 66 ++++++++++++++++
 4 files changed, 164 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/eagle/blob/051dc69c/eagle-external/eagle-log4jkafka/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-external/eagle-log4jkafka/pom.xml b/eagle-external/eagle-log4jkafka/pom.xml
index 9bd2983..d05279f 100644
--- a/eagle-external/eagle-log4jkafka/pom.xml
+++ b/eagle-external/eagle-log4jkafka/pom.xml
@@ -51,6 +51,21 @@
             <scope>test</scope>
         </dependency>
         <dependency>
+            <groupId>org.apache.curator</groupId>
+            <artifactId>curator-test</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.curator</groupId>
+            <artifactId>curator-client</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.curator</groupId>
+            <artifactId>curator-framework</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
             <groupId>org.scalatest</groupId>
             <artifactId>scalatest_${scala.version}</artifactId>
             <scope>test</scope>

http://git-wip-us.apache.org/repos/asf/eagle/blob/051dc69c/eagle-external/eagle-log4jkafka/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/eagle-external/eagle-log4jkafka/src/test/resources/log4j.properties b/eagle-external/eagle-log4jkafka/src/test/resources/log4j.properties
index 3cb8290..ffd4a21 100644
--- a/eagle-external/eagle-log4jkafka/src/test/resources/log4j.properties
+++ b/eagle-external/eagle-log4jkafka/src/test/resources/log4j.properties
@@ -23,8 +23,8 @@ log4j.appender.KAFKA.BrokerList=sandbox.hortonworks.com:6667
 log4j.appender.KAFKA.KeyClass=org.apache.eagle.log4j.kafka.hadoop.AuditLogKeyer
 log4j.appender.KAFKA.Layout=org.apache.log4j.PatternLayout
 log4j.appender.KAFKA.Layout.ConversionPattern=%d{ISO8601} %p %c{2}: %m%n
-log4j.appender.KAFKA.ProducerType=async
+log4j.appender.KAFKA.ProducerType=sync
 log4j.appender.KAFKA.BatchSize=1
 log4j.appender.KAFKA.QueueSize=1
-log4j.logger.eagle.kafka.producer.TestKafkaAppender$=console,KAFKA
-#log4j.logger.kafka.utils.VerifiableProperties=INFO,console
+# log4j.logger.org.apache.eagle.log4j.kafka.KafkaLog4jAppenderIT=DEBUG, console,KAFKA
+# log4j.logger.kafka.utils.VerifiableProperties=DEBUG,console
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/051dc69c/eagle-external/eagle-log4jkafka/src/test/scala/org/apache/eagle/log4j/kafka/KafkaLog4jAppenderIT.scala
----------------------------------------------------------------------
diff --git a/eagle-external/eagle-log4jkafka/src/test/scala/org/apache/eagle/log4j/kafka/KafkaLog4jAppenderIT.scala
b/eagle-external/eagle-log4jkafka/src/test/scala/org/apache/eagle/log4j/kafka/KafkaLog4jAppenderIT.scala
new file mode 100644
index 0000000..46f6c21
--- /dev/null
+++ b/eagle-external/eagle-log4jkafka/src/test/scala/org/apache/eagle/log4j/kafka/KafkaLog4jAppenderIT.scala
@@ -0,0 +1,80 @@
+/*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  * http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+package org.apache.eagle.log4j.kafka
+
+import java.util.Properties
+
+import org.apache.kafka.clients.consumer.ConsumerConfig
+import org.apache.log4j.{Level, Logger}
+import org.junit.Test
+
+class KafkaLog4jAppenderIT extends KafkaTestBase {
+  val KafkaLog4jAppenderTopic = "KafkaLog4jAppender"
+  val JKafkaLog4jAppenderTopic = "JKafkaLog4jAppender"
+  val kafkaBrokerList = "localhost:" + kafkaPort
+
+  Logger.getRootLogger.setLevel(Level.ALL)
+
+  val KafkaLog4jAppenderLogger = Logger.getLogger(classOf[KafkaLog4jAppender])
+  val JKafkaLog4jAppenderLogger = Logger.getLogger(classOf[JKafkaLog4jAppender])
+
+  def createConsumerConfig(): Properties = {
+    val props = new Properties()
+    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBrokerList)
+    props.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer")
+    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true")
+    props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000")
+    props.put(ConsumerConfig.SESSION_TIMEOUT_MS, "30000")
+    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")
+    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")
+    props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY, "kafka.consumer.RoundRobinAssignor")
+    props
+  }
+
+  @Test def testKafkaLog4jAppender(): Unit = {
+    val kafkaLog4jAppender = new KafkaLog4jAppender()
+    try {
+      kafkaLog4jAppender.setName(classOf[KafkaLog4jAppender].getName)
+      kafkaLog4jAppender.setTopic(KafkaLog4jAppenderTopic)
+      kafkaLog4jAppender.setBrokerList(kafkaBrokerList)
+      kafkaLog4jAppender.setBatchSize(1)
+      kafkaLog4jAppender.setQueueSize("1")
+
+      kafkaLog4jAppender.activateOptions()
+      KafkaLog4jAppenderLogger.addAppender(kafkaLog4jAppender)
+      KafkaLog4jAppenderLogger.info("message to KafkaLog4jAppender")
+    } finally {
+      kafkaLog4jAppender.close()
+    }
+  }
+
+  @Test def testJKafkaLog4jAppender(): Unit = {
+    val jKafkaLog4jAppender = new JKafkaLog4jAppender()
+    try {
+      jKafkaLog4jAppender.setName(classOf[JKafkaLog4jAppender].getName)
+      jKafkaLog4jAppender.setTopic(JKafkaLog4jAppenderTopic)
+      jKafkaLog4jAppender.setBrokerList(kafkaBrokerList)
+      jKafkaLog4jAppender.setSyncSend(true)
+      jKafkaLog4jAppender.setRequiredNumAcks(0)
+      jKafkaLog4jAppender.activateOptions()
+      JKafkaLog4jAppenderLogger.addAppender(jKafkaLog4jAppender)
+      JKafkaLog4jAppenderLogger.info("message to JKafkaLog4jAppender")
+    } finally {
+      jKafkaLog4jAppender.close()
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/eagle/blob/051dc69c/eagle-external/eagle-log4jkafka/src/test/scala/org/apache/eagle/log4j/kafka/KafkaTestBase.scala
----------------------------------------------------------------------
diff --git a/eagle-external/eagle-log4jkafka/src/test/scala/org/apache/eagle/log4j/kafka/KafkaTestBase.scala
b/eagle-external/eagle-log4jkafka/src/test/scala/org/apache/eagle/log4j/kafka/KafkaTestBase.scala
new file mode 100644
index 0000000..b77a43d
--- /dev/null
+++ b/eagle-external/eagle-log4jkafka/src/test/scala/org/apache/eagle/log4j/kafka/KafkaTestBase.scala
@@ -0,0 +1,66 @@
+/*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  * http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+package org.apache.eagle.log4j.kafka
+
+import java.util.Properties
+
+import kafka.server.{KafkaConfig, KafkaServerStartable}
+import org.apache.curator.framework.{CuratorFramework, CuratorFrameworkFactory}
+import org.apache.curator.retry.ExponentialBackoffRetry
+import org.apache.curator.test.{InstanceSpec, TestingServer}
+import org.junit._
+import org.junit.rules.TemporaryFolder
+import org.scalatest.junit.JUnitSuite
+
+
+class KafkaTestBase extends JUnitSuite {
+  val _tempFolder = new TemporaryFolder()
+
+  @Rule def tempFolder = _tempFolder
+
+  var zkServer:TestingServer = _
+  var curatorClient:CuratorFramework = _
+  var kafkaServer:KafkaServerStartable = _
+  val kafkaPort = InstanceSpec.getRandomPort
+  val zookeeperPort = InstanceSpec.getRandomPort
+
+  @Before
+  def before(): Unit = {
+    val logDir = tempFolder.newFolder()
+    this.zkServer = new TestingServer(zookeeperPort, logDir)
+    val retryPolicy = new ExponentialBackoffRetry(1000, 3)
+    this.curatorClient = CuratorFrameworkFactory.newClient(zkServer.getConnectString, retryPolicy)
+    this.curatorClient.start()
+
+    val p: Properties = new Properties
+    p.setProperty("zookeeper.connect", zkServer.getConnectString)
+    p.setProperty("broker.id", "0")
+    p.setProperty("port", "" + kafkaPort)
+    p.setProperty("log.dirs", logDir.getAbsolutePath)
+    p.setProperty("auto.create.topics.enable", "true")
+
+    this.kafkaServer = new KafkaServerStartable(new KafkaConfig(p))
+    this.kafkaServer.startup()
+  }
+
+  @After
+  def after(): Unit = {
+    kafkaServer.shutdown()
+    curatorClient.close()
+    zkServer.close()
+  }
+}


Mime
View raw message