ambari-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From stoa...@apache.org
Subject [50/51] [abbrv] ambari git commit: Use EventBus instead of BufferedAuditLogger custom implementation
Date Thu, 24 Mar 2016 12:09:22 GMT
Use EventBus instead of BufferedAuditLogger custom implementation


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/0a1e572e
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/0a1e572e
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/0a1e572e

Branch: refs/heads/audit_logging
Commit: 0a1e572e9ce9f9b5eb2a7710c5416fbf97e1a151
Parents: 1678b34
Author: Daniel Gergely <dgergely@hortonworks.com>
Authored: Wed Mar 23 15:22:49 2016 +0100
Committer: Toader, Sebastian <stoader@hortonworks.com>
Committed: Thu Mar 24 13:06:50 2016 +0100

----------------------------------------------------------------------
 .../ambari/server/audit/AsyncAuditLogger.java   |  85 +++++++++
 .../server/audit/AuditLoggerDefaultImpl.java    |   2 +
 .../ambari/server/audit/AuditLoggerModule.java  |   8 +-
 .../server/audit/BufferedAuditLogger.java       | 143 ---------------
 .../server/audit/BufferedAuditLoggerTest.java   | 174 -------------------
 5 files changed, 90 insertions(+), 322 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/0a1e572e/ambari-server/src/main/java/org/apache/ambari/server/audit/AsyncAuditLogger.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/audit/AsyncAuditLogger.java
b/ambari-server/src/main/java/org/apache/ambari/server/audit/AsyncAuditLogger.java
new file mode 100644
index 0000000..ac96391
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/audit/AsyncAuditLogger.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.audit;
+
+
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.ambari.server.audit.event.AuditEvent;
+
+import com.google.common.eventbus.AsyncEventBus;
+import com.google.common.eventbus.EventBus;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import com.google.inject.name.Named;
+
+/**
+ * This is a wrapper for an audit log implementation that uses {@link EventBus} to make audit
logging asynchronous
+ */
+@Singleton
+class AsyncAuditLogger implements AuditLogger {
+  /**
+   * Name for guice injection
+   */
+  final static String InnerLogger = "AsyncAuditLogger";
+
+  /**
+   * Event bus that holds audit event objects
+   */
+  private EventBus eventBus;
+
+  /**
+   * Constructor.
+   *
+   * @param auditLogger the audit logger to use
+   */
+  @Inject
+  public AsyncAuditLogger(@Named(InnerLogger) AuditLogger auditLogger) {
+    eventBus = new AsyncEventBus("AuditLoggerEventBus", new ThreadPoolExecutor(0, 1, 5L,
TimeUnit.MINUTES,
+      new LinkedBlockingQueue<Runnable>(), new AuditLogThreadFactory(),
+      new ThreadPoolExecutor.CallerRunsPolicy()));
+    eventBus.register(auditLogger);
+  }
+
+  @Override
+  public void log(AuditEvent event) {
+    eventBus.post(event);
+  }
+
+  /**
+   * A custom {@link ThreadFactory} for the threads that logs audit events
+   */
+  private static final class AuditLogThreadFactory implements ThreadFactory {
+
+    private static final AtomicInteger nextId = new AtomicInteger(1);
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public Thread newThread(Runnable runnable) {
+      Thread thread = new Thread(runnable, "auditlog-" + nextId.getAndIncrement());
+      thread.setDaemon(false);
+      return thread;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/0a1e572e/ambari-server/src/main/java/org/apache/ambari/server/audit/AuditLoggerDefaultImpl.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/audit/AuditLoggerDefaultImpl.java
b/ambari-server/src/main/java/org/apache/ambari/server/audit/AuditLoggerDefaultImpl.java
index adac54a..16d568d 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/audit/AuditLoggerDefaultImpl.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/audit/AuditLoggerDefaultImpl.java
@@ -26,6 +26,7 @@ import org.apache.ambari.server.audit.event.AuditEvent;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.eventbus.Subscribe;
 import com.google.inject.Singleton;
 
 /**
@@ -40,6 +41,7 @@ public class AuditLoggerDefaultImpl implements AuditLogger {
    * {@inheritDoc}
    */
   @Override
+  @Subscribe
   public void log(AuditEvent event) {
     Date date = new Date(event.getTimestamp());
     //2016-03-11T10:42:36.376Z

http://git-wip-us.apache.org/repos/asf/ambari/blob/0a1e572e/ambari-server/src/main/java/org/apache/ambari/server/audit/AuditLoggerModule.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/audit/AuditLoggerModule.java
b/ambari-server/src/main/java/org/apache/ambari/server/audit/AuditLoggerModule.java
index 876c4d9..b20714b 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/audit/AuditLoggerModule.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/audit/AuditLoggerModule.java
@@ -46,10 +46,8 @@ import org.apache.ambari.server.audit.request.eventcreator.UserEventCreator;
 import org.apache.ambari.server.audit.request.eventcreator.ValidationIgnoreEventCreator;
 import org.apache.ambari.server.audit.request.eventcreator.ViewInstanceEventCreator;
 import org.apache.ambari.server.audit.request.eventcreator.ViewPrivilegeEventCreator;
-import org.apache.ambari.server.configuration.Configuration;
 
 import com.google.inject.AbstractModule;
-import com.google.inject.Inject;
 import com.google.inject.multibindings.Multibinder;
 import com.google.inject.name.Names;
 
@@ -57,10 +55,10 @@ public class AuditLoggerModule extends AbstractModule {
 
   @Override
   protected void configure() {
-    bind(AuditLogger.class).to(BufferedAuditLogger.class);
+    bind(AuditLogger.class).to(AsyncAuditLogger.class);
 
-    // set AuditLoggerDefaultImpl to be used by BufferedAuditLogger
-    bind(AuditLogger.class).annotatedWith(Names.named(BufferedAuditLogger.InnerLogger)).to(AuditLoggerDefaultImpl.class);
+    // set AuditLoggerDefaultImpl to be used by AsyncAuditLogger
+    bind(AuditLogger.class).annotatedWith(Names.named(AsyncAuditLogger.InnerLogger)).to(AuditLoggerDefaultImpl.class);
 
     // binding for audit event creators
     Multibinder<RequestAuditEventCreator> auditLogEventCreatorBinder = Multibinder.newSetBinder(binder(),
RequestAuditEventCreator.class);

http://git-wip-us.apache.org/repos/asf/ambari/blob/0a1e572e/ambari-server/src/main/java/org/apache/ambari/server/audit/BufferedAuditLogger.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/audit/BufferedAuditLogger.java
b/ambari-server/src/main/java/org/apache/ambari/server/audit/BufferedAuditLogger.java
deleted file mode 100644
index e11bfd5..0000000
--- a/ambari-server/src/main/java/org/apache/ambari/server/audit/BufferedAuditLogger.java
+++ /dev/null
@@ -1,143 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.ambari.server.audit;
-
-
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
-
-import org.apache.ambari.server.audit.event.AuditEvent;
-import org.apache.ambari.server.configuration.Configuration;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.inject.Inject;
-import com.google.inject.Singleton;
-import com.google.inject.name.Named;
-
-/**
- * This is a decorator that adds buffering and running on separate thread (instead of the
tread of the caller)
- * to an existing audit logger implementation.
- * It uses a bounded queue for holding audit events before they are logged.
- */
-@Singleton
-public class BufferedAuditLogger implements AuditLogger {
-
-  private static final Logger LOG = LoggerFactory.getLogger(BufferedAuditLogger.class);
-
-  /**
-   * Capacity of the buffer
-   */
-  private final int bufferCapacity;
-
-  /**
-   * Buffer capacity status
-   */
-  private final double capacityWaterMark;
-
-  /**
-   * Wrapped audit logger
-   */
-  private final AuditLogger auditLogger;
-
-  /**
-   * Thread pool
-   */
-  private final ExecutorService pool;
-
-  /**
-   * Names for guice injection
-   */
-  public final static String InnerLogger = "BufferedAuditLogger";
-  public final static String Capacity = "BufferedAuditLogger.capacity";
-
-
-  /**
-   * The queue that holds the audit events to be logged in case there are
-   * peeks when the producers logs audit events at a higher pace than
-   * this audit logger can consume.
-   */
-  protected final BlockingQueue<AuditEvent> auditEventWorkQueue;
-
-  private class Consumer implements Runnable {
-    @Override
-    public void run() {
-      while (true) {
-        try {
-          AuditEvent auditEvent = auditEventWorkQueue.take();
-          auditLogger.log(auditEvent);
-        } catch (InterruptedException ex) {
-          LOG.error("Logging of audit events interrupted ! There are {} audit events left
unlogged !", auditEventWorkQueue.size());
-
-          pool.shutdownNow();
-          Thread.currentThread().interrupt();
-
-        } catch (Exception ex) {
-          LOG.error("Error caught during logging audit events: " + ex);
-        }
-
-      }
-    }
-  }
-
-
-  /**
-   * Constructor.
-   *
-   * @param auditLogger the audit logger to extend
-   */
-  @Inject
-  public BufferedAuditLogger(@Named(InnerLogger) AuditLogger auditLogger, Configuration configuration)
{
-    this.bufferCapacity = configuration.getBufferedAuditLoggerCapacity();
-    this.capacityWaterMark = 0.2; // 20 percent of full capacity
-
-    this.auditEventWorkQueue = new LinkedBlockingQueue<>(bufferCapacity);
-    this.auditLogger = auditLogger;
-
-    this.pool = Executors.newSingleThreadExecutor();
-    pool.execute(new Consumer());
-
-  }
-
-
-  /**
-   * Logs audit log events
-   *
-   * @param event
-   */
-  @Override
-  public void log(final AuditEvent event) {
-
-    try {
-
-      this.auditEventWorkQueue.put(event);
-
-      int remainingCapacity = this.auditEventWorkQueue.remainingCapacity();
-
-      LOG.debug("Work queue load:  [{}/{}]", bufferCapacity - remainingCapacity, bufferCapacity);
-
-      if (remainingCapacity < bufferCapacity * capacityWaterMark)
-        LOG.warn("Work queue remaining capacity: {} is below {}%", remainingCapacity, capacityWaterMark
* 100);
-
-    } catch (InterruptedException ex) {
-      LOG.error("Interrupted while waiting to queue audit event: " + event.getAuditMessage());
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/ambari/blob/0a1e572e/ambari-server/src/test/java/org/apache/ambari/server/audit/BufferedAuditLoggerTest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/audit/BufferedAuditLoggerTest.java
b/ambari-server/src/test/java/org/apache/ambari/server/audit/BufferedAuditLoggerTest.java
deleted file mode 100644
index 445c339..0000000
--- a/ambari-server/src/test/java/org/apache/ambari/server/audit/BufferedAuditLoggerTest.java
+++ /dev/null
@@ -1,174 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ambari.server.audit;
-
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.ambari.server.audit.event.AuditEvent;
-import org.apache.ambari.server.audit.event.OperationStatusAuditEvent;
-import org.apache.ambari.server.configuration.Configuration;
-import org.easymock.Capture;
-import org.easymock.EasyMock;
-import org.easymock.EasyMockRule;
-import org.easymock.Mock;
-import org.easymock.MockType;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-
-import com.google.common.collect.ImmutableList;
-
-import static org.easymock.EasyMock.anyObject;
-import static org.easymock.EasyMock.capture;
-import static org.easymock.EasyMock.expectLastCall;
-import static org.easymock.EasyMock.newCapture;
-import static org.easymock.EasyMock.replay;
-import static org.easymock.EasyMock.reset;
-import static org.easymock.EasyMock.verify;
-import static org.hamcrest.core.IsEqual.equalTo;
-import static org.junit.Assert.assertThat;
-
-public class BufferedAuditLoggerTest {
-
-  @Rule
-  public EasyMockRule mocks = new EasyMockRule(this);
-
-  @Mock(type = MockType.NICE)
-  private AuditEvent auditEvent;
-
-  @Mock(type = MockType.STRICT)
-  private AuditLogger auditLogger;
-
-  @Mock(type = MockType.STRICT)
-  private Configuration configuration;
-
-
-  @Before
-  public void setUp() throws Exception {
-    reset(auditEvent, auditLogger);
-
-  }
-
-  @Test(timeout = 300)
-  public void testLog() throws Exception {
-    // Given
-    Capture<AuditEvent> capturedArgument = newCapture();
-    auditLogger.log(capture(capturedArgument));
-
-    EasyMock.expect(configuration.getBufferedAuditLoggerCapacity()).andReturn(50);
-    replay(configuration);
-
-    BufferedAuditLogger bufferedAuditLogger = new BufferedAuditLogger(auditLogger, configuration);
-
-    replay(auditLogger, auditEvent);
-
-
-    // When
-    bufferedAuditLogger.log(auditEvent);
-
-    Thread.sleep(100);
-    // Then
-    verify(auditLogger, configuration);
-
-
-    assertThat(capturedArgument.getValue(), equalTo(auditEvent));
-  }
-
-  @Test(timeout = 300)
-  public void testConsumeAuditEventsFromInternalBuffer() throws Exception {
-    // Given
-    EasyMock.expect(configuration.getBufferedAuditLoggerCapacity()).andReturn(5);
-    replay(configuration);
-    BufferedAuditLogger bufferedAuditLogger = new BufferedAuditLogger(auditLogger, configuration);
-
-    List<AuditEvent> auditEvents = Collections.nCopies(50, auditEvent);
-
-    auditLogger.log((AuditEvent)anyObject(AuditEvent.class));
-    expectLastCall().times(50);
-
-    replay(auditLogger, auditEvent);
-
-    // When
-    for (AuditEvent event : auditEvents) {
-      bufferedAuditLogger.log(event);
-    }
-
-    // Then
-    while (!bufferedAuditLogger.auditEventWorkQueue.isEmpty()) {
-      Thread.sleep(100);
-    }
-
-    verify(auditLogger, auditEvent, configuration);
-  }
-
-  @Test(timeout = 3000)
-  public void testMultipleProducersLogging() throws Exception {
-    // Given
-    int nProducers = 100;
-
-    EasyMock.expect(configuration.getBufferedAuditLoggerCapacity()).andReturn(10000);
-    replay(configuration);
-
-    final BufferedAuditLogger bufferedAuditLogger = new BufferedAuditLogger(new AuditLoggerDefaultImpl(),
configuration);
-
-    ImmutableList.Builder<Thread> producersBuilder = ImmutableList.builder();
-
-    for (int i = 0; i < nProducers; i++) {
-      final Integer reqId = i * 10000;
-      final AuditEvent event =
-        OperationStatusAuditEvent.builder()
-          .withStatus("IN PROGRESS")
-          .withTimestamp(System.currentTimeMillis())
-          .withRequestId(reqId.toString())
-          .build();
-
-      producersBuilder.add(new Thread(new Runnable() {
-        final int nAuditEventsPerProducer = 100;
-
-        @Override
-        public void run() {
-          for (int j = 0; j < nAuditEventsPerProducer; j++) {
-            bufferedAuditLogger.log(event);
-          }
-
-        }
-      }
-
-      ));
-    }
-
-    List<Thread> producers = producersBuilder.build();
-
-
-
-    // When
-    for (Thread producer : producers) {
-      producer.start(); // nProducers threads creating nAuditEventsPerProducer events each
in parallel
-    }
-
-    // Then
-    while (!bufferedAuditLogger.auditEventWorkQueue.isEmpty()) {
-      Thread.sleep(100);
-    }
-
-    verify(configuration);
-
-  }
-}


Mime
View raw message