flume-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From arv...@apache.org
Subject svn commit: r1208691 - in /incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src: main/java/org/apache/flume/channel/jdbc/ main/java/org/apache/flume/channel/jdbc/impl/ test/java/org/apache/flume/channel/jdbc/
Date Wed, 30 Nov 2011 18:43:47 GMT
Author: arvind
Date: Wed Nov 30 18:43:46 2011
New Revision: 1208691

URL: http://svn.apache.org/viewvc?rev=1208691&view=rev
Log:
FLUME-820. Support for capacity specification for JDBC channel.

Modified:
    incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/ConfigurationConstants.java
    incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/DerbySchemaHandler.java
    incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/JdbcChannelProviderImpl.java
    incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/MySQLSchemaHandler.java
    incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/SchemaHandler.java
    incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/MockEventUtils.java
    incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/TestDerbySchemaHandlerQueries.java
    incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/TestJdbcChannelProvider.java

Modified: incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/ConfigurationConstants.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/ConfigurationConstants.java?rev=1208691&r1=1208690&r2=1208691&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/ConfigurationConstants.java
(original)
+++ incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/ConfigurationConstants.java
Wed Nov 30 18:43:46 2011
@@ -54,10 +54,13 @@ public final class ConfigurationConstant
   public static final String CONFIG_MAX_CONNECTION =
       PREFIX + "maximum.connections";
 
+  public static final String CONFIG_MAX_CAPACITY =
+      PREFIX + "maximum.capacity";
+
   // Built in constants for JDBC Channel implementation
 
   /**
-   * The length for payload bytes that will stored inline. Payloads larger
+   * The length for payload bytes that will be stored inline. Payloads larger
    * than this length will spill into BLOB.
    */
   public static int PAYLOAD_LENGTH_THRESHOLD = 16384; // 16kb

Modified: incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/DerbySchemaHandler.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/DerbySchemaHandler.java?rev=1208691&r1=1208690&r2=1208691&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/DerbySchemaHandler.java
(original)
+++ incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/DerbySchemaHandler.java
Wed Nov 30 18:43:46 2011
@@ -261,6 +261,9 @@ public class DerbySchemaHandler implemen
          + "SCHEMAID = (SELECT SCHEMAID FROM SYS.SYSSCHEMAS WHERE "
          + "SCHEMANAME = ? ))";
 
+  public static final String QUERY_CHANNEL_SIZE
+      = "SELECT COUNT(*) FROM " + TABLE_FL_EVENT;
+
   public static final String STMT_INSERT_EVENT_BASE
       = "INSERT INTO " + TABLE_FL_EVENT + " ("
           + COLUMN_FLE_PAYLOAD + ", " + COLUMN_FLE_CHANNEL + ", "
@@ -1013,4 +1016,41 @@ public class DerbySchemaHandler implemen
 
     return peBuilder.build();
   }
+
+  @Override
+  public long getChannelSize(Connection connection) {
+    long size = 0L;
+    Statement stmt = null;
+    try {
+      stmt = connection.createStatement();
+      stmt.execute(QUERY_CHANNEL_SIZE);
+      ResultSet rset = stmt.getResultSet();
+      if (!rset.next()) {
+        throw new JdbcChannelException("Failed to determine channel size: "
+              + "Query (" + QUERY_CHANNEL_SIZE
+              + ") did not produce any results");
+      }
+
+      size = rset.getLong(1);
+      connection.commit();
+    } catch (SQLException ex) {
+      try {
+        connection.rollback();
+      } catch (SQLException ex2) {
+        LOGGER.error("Unable to rollback transaction", ex2);
+      }
+      throw new JdbcChannelException("Unable to run query: "
+          + QUERY_CHANNEL_SIZE, ex);
+    } finally {
+      if (stmt != null) {
+        try {
+          stmt.close();
+        } catch (SQLException ex) {
+          LOGGER.error("Unable to close statement", ex);
+        }
+      }
+    }
+
+    return size;
+  }
 }

Modified: incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/JdbcChannelProviderImpl.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/JdbcChannelProviderImpl.java?rev=1208691&r1=1208690&r2=1208691&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/JdbcChannelProviderImpl.java
(original)
+++ incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/JdbcChannelProviderImpl.java
Wed Nov 30 18:43:46 2011
@@ -21,6 +21,7 @@ import java.io.File;
 import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.InputStream;
+import java.sql.Connection;
 import java.sql.DriverManager;
 import java.sql.SQLException;
 import java.util.Enumeration;
@@ -84,6 +85,9 @@ public class JdbcChannelProviderImpl imp
   /** Driver Class Name */
   private String driverClassName;
 
+  /** Capacity Counter if one is needed */
+  private long maxCapacity = 0L;
+
   @Override
   public void initialize(Context context) {
     if (LOGGER.isDebugEnabled()) {
@@ -93,6 +97,27 @@ public class JdbcChannelProviderImpl imp
 
     initializeDataSource(context);
     initializeSchema(context);
+    initializeChannelState(context);
+  }
+
+  private void initializeChannelState(Context context) {
+    String maxCapacityStr = context.getString(
+        ConfigurationConstants.CONFIG_MAX_CAPACITY, "0");
+
+    long maxCapacitySpecified = 0;
+    try {
+      maxCapacitySpecified = Long.parseLong(maxCapacityStr);
+    } catch (NumberFormatException nfe) {
+      LOGGER.warn("Invalid value specified for maximum channel capacity: "
+          + maxCapacityStr, nfe);
+    }
+
+    if (maxCapacitySpecified > 0) {
+      this.maxCapacity = maxCapacitySpecified;
+      LOGGER.debug("Maximum channel capacity: " + maxCapacity);
+    } else {
+      LOGGER.warn("JDBC channel will operate without a capacity limit.");
+    }
   }
 
   private void initializeSchema(Context context) {
@@ -184,6 +209,16 @@ public class JdbcChannelProviderImpl imp
     try {
       tx = getTransaction();
       tx.begin();
+      Connection conn = tx.getConnection();
+
+      if (maxCapacity > 0) {
+        long currentSize = schemaHandler.getChannelSize(conn);
+        if (currentSize >= maxCapacity) {
+          throw new JdbcChannelException("Channel capacity reached: "
+              + "maxCapacity: " + maxCapacity + ", currentSize: "
+              + currentSize);
+        }
+      }
 
       // Persist the persistableEvent
       schemaHandler.storeEvent(persistableEvent, tx.getConnection());

Modified: incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/MySQLSchemaHandler.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/MySQLSchemaHandler.java?rev=1208691&r1=1208690&r2=1208691&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/MySQLSchemaHandler.java
(original)
+++ incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/MySQLSchemaHandler.java
Wed Nov 30 18:43:46 2011
@@ -51,7 +51,6 @@ public class MySQLSchemaHandler implemen
   @Override
   public void storeEvent(PersistableEvent pe, Connection connection) {
     // TODO Auto-generated method stub
-
   }
 
   @Override
@@ -61,4 +60,9 @@ public class MySQLSchemaHandler implemen
     return null;
   }
 
+  @Override
+  public long getChannelSize(Connection connection) {
+    // TODO Auto-generated method stub
+    return 0;
+  }
 }

Modified: incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/SchemaHandler.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/SchemaHandler.java?rev=1208691&r1=1208690&r2=1208691&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/SchemaHandler.java
(original)
+++ incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/SchemaHandler.java
Wed Nov 30 18:43:46 2011
@@ -65,4 +65,14 @@ public interface SchemaHandler {
    */
   public PersistableEvent fetchAndDeleteEvent(
       String channel, Connection connection);
+
+  /**
+   * Returns the current size of the channel using the connection specified that
+   * must have an active transaction ongoing. This allows the provider impl to
+   * enforce channel capacity limits when persisting events.
+   * @return the current size of the channel.
+   * @param connection
+   * @return
+   */
+  public long getChannelSize(Connection connection);
 }

Modified: incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/MockEventUtils.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/MockEventUtils.java?rev=1208691&r1=1208690&r2=1208691&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/MockEventUtils.java
(original)
+++ incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/MockEventUtils.java
Wed Nov 30 18:43:46 2011
@@ -74,7 +74,10 @@ public final class MockEventUtils {
       int headerNameMargin,	int headerValueMargin, int numHeaders,
       int numChannels) {
 
-    int chIndex = Math.abs(RANDOM.nextInt())%numChannels;
+    int chIndex = 0;
+    if (numChannels > 1) {
+      chIndex = Math.abs(RANDOM.nextInt())%numChannels;
+    }
     String channel = "test-"+chIndex;
 
     StringBuilder sb = new StringBuilder("New Event[payload size:");

Modified: incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/TestDerbySchemaHandlerQueries.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/TestDerbySchemaHandlerQueries.java?rev=1208691&r1=1208690&r2=1208691&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/TestDerbySchemaHandlerQueries.java
(original)
+++ incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/TestDerbySchemaHandlerQueries.java
Wed Nov 30 18:43:46 2011
@@ -75,6 +75,9 @@ public class TestDerbySchemaHandlerQueri
           + "SCHEMAID = (SELECT SCHEMAID FROM SYS.SYSSCHEMAS WHERE "
           + "SCHEMANAME = ? ))";
 
+  public static final String EXPECTED_QUERY_CHANNEL_SIZE
+      = "SELECT COUNT(*) FROM FLUME.FL_EVENT";
+
   public static final String EXPECTED_STMT_INSERT_EVENT_BASE
       = "INSERT INTO FLUME.FL_EVENT (FLE_PAYLOAD, FLE_CHANNEL, FLE_SPILL) "
           + "VALUES ( ?, ?, ?)";
@@ -167,6 +170,9 @@ public class TestDerbySchemaHandlerQueri
     Assert.assertEquals(DerbySchemaHandler.COLUMN_LOOKUP_QUERY,
         EXPECTED_COLUMN_LOOKUP_QUERY);
 
+    Assert.assertEquals(DerbySchemaHandler.QUERY_CHANNEL_SIZE,
+        EXPECTED_QUERY_CHANNEL_SIZE);
+
     Assert.assertEquals(DerbySchemaHandler.STMT_INSERT_EVENT_BASE,
         EXPECTED_STMT_INSERT_EVENT_BASE);
 

Modified: incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/TestJdbcChannelProvider.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/TestJdbcChannelProvider.java?rev=1208691&r1=1208690&r2=1208691&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/TestJdbcChannelProvider.java
(original)
+++ incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/TestJdbcChannelProvider.java
Wed Nov 30 18:43:46 2011
@@ -85,6 +85,44 @@ public class TestJdbcChannelProvider {
   }
 
   @Test
+  public void testDerbyChannelCapacity() {
+    provider = new JdbcChannelProviderImpl();
+
+    derbyCtx.put(ConfigurationConstants.CONFIG_MAX_CAPACITY, "10");
+
+    provider.initialize(derbyCtx);
+
+    Set<MockEvent> events = new HashSet<MockEvent>();
+    for (int i = 1; i < 12; i++) {
+      events.add(MockEventUtils.generateMockEvent(i, i, i, 61%i, 1));
+    }
+
+    Iterator<MockEvent> meIt = events.iterator();
+    int count = 0;
+    while (meIt.hasNext()) {
+      count++;
+      MockEvent me = meIt.next();
+      String chName = me.getChannel();
+      try {
+        provider.persistEvent(chName, me);
+        if (count == 11) {
+          Assert.fail();
+        }
+      } catch (JdbcChannelException ex) {
+        // This is expected if the count is 10
+        Assert.assertEquals(11, count);
+      }
+
+      // Now should be able to remove one event and add this one
+      Event e = provider.removeEvent(chName);
+      Assert.assertNotNull(e);
+
+      // The current event should safely persist now
+      provider.persistEvent(chName, me);
+    }
+  }
+
+  @Test
   public void testDerbySetup() {
     provider = new JdbcChannelProviderImpl();
 
@@ -111,7 +149,7 @@ public class TestJdbcChannelProvider {
   }
 
   /**
-   * Creates 1000 events split over 10 channels, stores them via multiple
+   * Creates 120 events split over 10 channels, stores them via multiple
    * simulated sources and consumes them via multiple simulated channels.
    */
   @Test
@@ -122,7 +160,7 @@ public class TestJdbcChannelProvider {
     Map<String, List<MockEvent>> eventMap =
         new HashMap<String, List<MockEvent>>();
 
-    for (int i = 1; i < 1001; i++) {
+    for (int i = 1; i < 121; i++) {
       MockEvent me = MockEventUtils.generateMockEvent(i, i, i, 61%i, 10);
       List<MockEvent> meList = eventMap.get(me.getChannel());
       if (meList == null) {
@@ -155,20 +193,20 @@ public class TestJdbcChannelProvider {
       srcCount += srcOutput.get();
     }
 
-    Assert.assertEquals(1000, srcCount);
+    Assert.assertEquals(120, srcCount);
 
     int sinkCount = 0;
     for (Future<Integer> sinkOutput : sinkResults) {
       sinkCount += sinkOutput.get();
     }
 
-    Assert.assertEquals(1000, sinkCount);
+    Assert.assertEquals(120, sinkCount);
 
   }
 
 
   /**
-   * creates 1000 events split over 5 channels, stores them
+   * creates 80 events split over 5 channels, stores them
    */
   @Test
   public void testPeristingEvents() {
@@ -179,7 +217,7 @@ public class TestJdbcChannelProvider {
         new HashMap<String, List<MockEvent>>();
 
     Set<MockEvent> events = new HashSet<MockEvent>();
-    for (int i = 1; i < 1001; i++) {
+    for (int i = 1; i < 81; i++) {
       events.add(MockEventUtils.generateMockEvent(i, i, i, 61%i, 5));
     }
 



Mime
View raw message