flume-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From arv...@apache.org
Subject svn commit: r1182209 - in /incubator/flume/branches/flume-728: ./ flume-ng-channels/flume-jdbc-channel/ flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/ flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flum...
Date Wed, 12 Oct 2011 05:39:46 GMT
Author: arvind
Date: Wed Oct 12 05:39:45 2011
New Revision: 1182209

URL: http://svn.apache.org/viewvc?rev=1182209&view=rev
Log:
Work related to implementation of JDBC channel

Added:
    incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/JdbcChannelException.java
    incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/TransactionIsolation.java
    incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/
    incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/DerbySchemaHandler.java
    incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/JdbcChannelProviderImpl.java
    incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/JdbcTransactionFactory.java
    incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/JdbcTransactionImpl.java
    incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/MySQLSchemaHandler.java
    incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/SchemaHandler.java
    incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/SchemaHandlerFactory.java
    incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/
    incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/
    incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/
    incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/
    incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/
    incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/
    incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/
    incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/TestDatabaseTypeEnum.java
    incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/TestDerbySchemaHandlerQueries.java
    incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/TestJdbcChannelProvider.java
    incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/TestTransactionIsolationLevelEnum.java
Modified:
    incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/pom.xml
    incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/ConfigurationConstants.java
    incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/DatabaseType.java
    incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/JdbcChannel.java
    incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/JdbcChannelProvider.java
    incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/conf/properties/FlumeConfiguration.java
    incubator/flume/branches/flume-728/pom.xml

Modified: incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/pom.xml
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/pom.xml?rev=1182209&r1=1182208&r2=1182209&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/pom.xml (original)
+++ incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/pom.xml Wed Oct 12 05:39:45 2011
@@ -42,10 +42,23 @@ limitations under the License.
       <artifactId>log4j</artifactId>
     </dependency>
     <dependency>
+      <groupId>commons-dbcp</groupId>
+      <artifactId>commons-dbcp</artifactId>
+    </dependency>
+    <dependency>
       <groupId>junit</groupId>
       <artifactId>junit</artifactId>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.apache.derby</groupId>
+      <artifactId>derby</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-log4j12</artifactId>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 
 </project>

Modified: incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/ConfigurationConstants.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/ConfigurationConstants.java?rev=1182209&r1=1182208&r2=1182209&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/ConfigurationConstants.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/ConfigurationConstants.java Wed Oct 12 05:39:45 2011
@@ -24,12 +24,9 @@ public final class ConfigurationConstant
 
   public static final String PREFIX = "org.apache.flume.channel.jdbc.";
 
-  public static final String CONFIG_DRIVER_CLASS =
+  public static final String CONFIG_JDBC_DRIVER_CLASS =
       PREFIX + "driver.class";
 
-  public static final String CONFIG_DATABASE_TYPE =
-      PREFIX + "db.type";
-
   public static final String CONFIG_USERNAME =
       PREFIX + "db.username";
 
@@ -40,11 +37,20 @@ public final class ConfigurationConstant
       PREFIX + "driver.url";
 
   public static final String CONFIG_JDBC_PROPERTIES_FILE =
-      PREFIX + "properties.file";
+      PREFIX + "connection.properties.file";
+
+  public static final String CONFIG_DATABASE_TYPE =
+      PREFIX + "db.type";
 
   public static final String CONFIG_CREATE_SCHEMA =
       PREFIX + "create.schema";
 
+  public static final String CONFIG_TX_ISOLATION_LEVEL =
+      PREFIX + "transaction.isolation";
+
+  public static final String CONFIG_MAX_CONNECTION =
+      PREFIX + "maximum.connections";
+
   private ConfigurationConstants() {
     // Disable object creation
   }

Modified: incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/DatabaseType.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/DatabaseType.java?rev=1182209&r1=1182208&r2=1182209&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/DatabaseType.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/DatabaseType.java Wed Oct 12 05:39:45 2011
@@ -1,18 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.flume.channel.jdbc;
 
 public enum DatabaseType {
   /** All other databases */
-  OTHER,
+  OTHER("OTHER", null),
 
   /** Apache Derby */
-  DERBY,
+  DERBY("DERBY", "values(1)"),
 
   /** MySQL */
-  MYSQL,
+  MYSQL("MYSQL", "select 1"),
 
   /** PostgreSQL */
-  PGSQL,
+  POSTGRESQL("POSTGRESQL", null),
 
   /** Oracle */
-  ORACLE;
+  ORACLE("ORACLE", null);
+
+  private final String name;
+  private final String validationQuery;
+
+  private DatabaseType(String name, String validationQuery) {
+    this.name = name;
+    this.validationQuery = validationQuery;
+  }
+
+  public String toString() {
+    return getName();
+  }
+
+  public String getName() {
+    return name;
+  }
+
+  public String getValidationQuery() {
+    return validationQuery;
+  }
+
+  public static DatabaseType getByName(String dbName) {
+    DatabaseType type = null;
+    try {
+      type = DatabaseType.valueOf(dbName.trim().toUpperCase());
+    } catch (IllegalArgumentException ex) {
+      type = DatabaseType.OTHER;
+    }
+
+    return type;
+  }
+
 }

Modified: incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/JdbcChannel.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/JdbcChannel.java?rev=1182209&r1=1182208&r2=1182209&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/JdbcChannel.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/JdbcChannel.java Wed Oct 12 05:39:45 2011
@@ -43,25 +43,22 @@ public class JdbcChannel implements Chan
     provider = JdbcChannelProviderFactory.getProvider(configuration);
     this.name = name;
 
-    LOG.info("JDBC Channel initialied: " + name);
+    LOG.info("JDBC Channel initialized: " + name);
   }
 
   @Override
   public void put(Event event) throws ChannelException {
-    // TODO Auto-generated method stub
-
+    getProvider().persistEvent(getName(), event);
   }
 
   @Override
   public Event take() throws ChannelException {
-    // TODO Auto-generated method stub
-    return null;
+    return getProvider().removeEvent(getName());
   }
 
   @Override
   public Transaction getTransaction() {
-    // TODO Auto-generated method stub
-    return null;
+    return getProvider().getTransaction();
   }
 
   @Override
@@ -71,7 +68,10 @@ public class JdbcChannel implements Chan
 
   @Override
   public String getName() {
-    // FIXME
-    return null;
+    return name;
+  }
+
+  private JdbcChannelProvider getProvider() {
+    return provider;
   }
 }

Added: incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/JdbcChannelException.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/JdbcChannelException.java?rev=1182209&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/JdbcChannelException.java (added)
+++ incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/JdbcChannelException.java Wed Oct 12 05:39:45 2011
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flume.channel.jdbc;
+
+import org.apache.flume.ChannelException;
+
+public class JdbcChannelException extends ChannelException {
+
+  public JdbcChannelException(String message) {
+    super(message);
+  }
+
+  public JdbcChannelException(String message, Exception cause) {
+    super(message, cause);
+  }
+
+  public JdbcChannelException(Exception cause) {
+    super(cause);
+  }
+
+}

Modified: incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/JdbcChannelProvider.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/JdbcChannelProvider.java?rev=1182209&r1=1182208&r2=1182209&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/JdbcChannelProvider.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/JdbcChannelProvider.java Wed Oct 12 05:39:45 2011
@@ -19,6 +19,9 @@ package org.apache.flume.channel.jdbc;
 
 import java.util.Properties;
 
+import org.apache.flume.Event;
+import org.apache.flume.Transaction;
+
 /**
  * Service provider interface for JDBC channel providers.
  */
@@ -36,4 +39,26 @@ public interface JdbcChannelProvider {
    * channel provider cannot be used and must be discarded.
    */
   public void close();
+
+  /**
+   * Writes the event to the persistent store.
+   * @param channelName
+   * @param event
+   */
+  public void persistEvent(String channelName, Event event);
+
+
+  /**
+   * Removes the next event for the named channel from the underlying
+   * persistent store.
+   * @param channelName
+   * @return
+   */
+  public Event removeEvent(String channelName);
+
+  /**
+   * @return the transaction associated with the current thread.
+   */
+  public Transaction getTransaction();
+
 }

Added: incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/TransactionIsolation.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/TransactionIsolation.java?rev=1182209&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/TransactionIsolation.java (added)
+++ incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/TransactionIsolation.java Wed Oct 12 05:39:45 2011
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flume.channel.jdbc;
+
+import java.sql.Connection;
+
+public enum TransactionIsolation {
+
+  READ_UNCOMMITTED("READ_UNCOMMITTED", Connection.TRANSACTION_READ_UNCOMMITTED),
+  READ_COMMITTED("READ_COMMITTED", Connection.TRANSACTION_READ_COMMITTED),
+  REPEATABLE_READ("REPEATABLE_READ", Connection.TRANSACTION_REPEATABLE_READ),
+  SERIALIZABLE("SERIALIZABLE", Connection.TRANSACTION_SERIALIZABLE);
+
+  private final String name;
+  private final int code;
+
+  private TransactionIsolation(String name, int code) {
+    this.name = name;
+    this.code = code;
+  }
+
+  public int getCode() {
+    return code;
+  }
+
+  public String getName() {
+    return name;
+  }
+
+  public String toString() {
+    return getName();
+  }
+
+  public static TransactionIsolation getByName(String name) {
+    return valueOf(name.trim().toUpperCase());
+  }
+}

Added: incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/DerbySchemaHandler.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/DerbySchemaHandler.java?rev=1182209&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/DerbySchemaHandler.java (added)
+++ incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/DerbySchemaHandler.java Wed Oct 12 05:39:45 2011
@@ -0,0 +1,285 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flume.channel.jdbc.impl;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.HashSet;
+import java.util.Set;
+
+import javax.sql.DataSource;
+
+import org.apache.flume.channel.jdbc.JdbcChannelException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DerbySchemaHandler implements SchemaHandler {
+
+  private static final Logger LOGGER =
+      LoggerFactory.getLogger(DerbySchemaHandler.class);
+
+
+  private static final String QUREY_SYSCHEMA_FLUME
+      = "SELECT SCHEMAID FROM SYS.SYSSCHEMAS WHERE SCHEMANAME = 'FLUME'";
+
+  private static final String SCHEMA_FLUME = "FLUME";
+
+  private static final String TABLE_FL_EVENT_NAME = "FL_EVENT";
+  private static final String TABLE_FL_EVENT = SCHEMA_FLUME + "."
+      + TABLE_FL_EVENT_NAME;
+  private static final String COLUMN_FLE_ID = "FLE_ID";
+  private static final String COLUMN_FLE_PAYLOAD = "FLE_PAYLOAD";
+  private static final String COLUMN_FLE_CHANNEL = "FLE_CHANNEL";
+
+  private static final String TABLE_FL_PLEXT_NAME = "FL_PLEXT";
+  private static final String TABLE_FL_PLEXT = SCHEMA_FLUME + "."
+      + TABLE_FL_PLEXT_NAME;
+  private static final String COLUMN_FLP_EVENTID = "FLP_EVENTID";
+  private static final String COLUMN_FLP_SPILL = "FLP_SPILL";
+
+  private static final String TABLE_FL_HEADER_NAME = "FL_HEADER";
+  private static final String TABLE_FL_HEADER = SCHEMA_FLUME + "."
+      + TABLE_FL_HEADER_NAME;
+  private static final String COLUMN_FLH_EVENTID = "FLH_EVENTID";
+  private static final String COLUMN_FLH_NAME = "FLH_NAME";
+  private static final String COLUMN_FLH_VALUE = "FLH_VALUE";
+
+
+  public static final String QUERY_CREATE_SCHEMA_FLUME
+      = "CREATE SCHEMA " + SCHEMA_FLUME;
+
+  public static final String QUERY_CREATE_TABLE_FL_EVENT
+      = "CREATE TABLE " + TABLE_FL_EVENT + " ( "
+        + COLUMN_FLE_ID + " BIGINT GENERATED ALWAYS AS IDENTITY "
+        + "(START WITH 2, INCREMENT BY 1) PRIMARY KEY, "
+        + COLUMN_FLE_PAYLOAD + " VARCHAR(16384) FOR BIT DATA, " // 16kb
+        + COLUMN_FLE_CHANNEL + " VARCHAR(32))";
+
+  public static final String QUERY_CREATE_TABLE_FL_PLEXT
+      = "CREATE TABLE " + TABLE_FL_PLEXT + " ( "
+        + COLUMN_FLP_EVENTID + " BIGINT, "
+        + COLUMN_FLP_SPILL + " BLOB, "
+        + "FOREIGN KEY (" + COLUMN_FLP_EVENTID + ") REFERENCES "
+        + TABLE_FL_EVENT + " (" + COLUMN_FLE_ID + "))";
+
+  public static final String QUERY_CREATE_TABLE_FL_HEADER
+      = "CREATE TABLE " + TABLE_FL_HEADER + " ( "
+        + COLUMN_FLH_EVENTID + " BIGINT, "
+        + COLUMN_FLH_NAME + " VARCHAR(255), "
+        + COLUMN_FLH_VALUE + " VARCHAR(255), "
+        + "FOREIGN KEY (" + COLUMN_FLH_EVENTID + ") REFERENCES "
+        + TABLE_FL_EVENT + " (" + COLUMN_FLE_ID + "))";
+
+  public static final String COLUMN_LOOKUP_QUERY =
+      "SELECT COLUMNNAME from SYS.SYSCOLUMNS where REFERENCEID = "
+         + "(SELECT TABLEID FROM SYS.SYSTABLES WHERE TABLENAME = ? AND "
+         + "SCHEMAID = (SELECT SCHEMAID FROM SYS.SYSSCHEMAS WHERE "
+         + "SCHEMANAME = ? ))";
+
+  private final DataSource dataSource;
+
+  protected DerbySchemaHandler(DataSource dataSource) {
+    this.dataSource = dataSource;
+  }
+
+
+  @Override
+  public boolean schemaExists() {
+    Connection connection = null;
+    ResultSet rset = null;
+    try {
+      connection = dataSource.getConnection();
+      Statement stmt = connection.createStatement();
+      rset = stmt.executeQuery(QUREY_SYSCHEMA_FLUME);
+      if (!rset.next()) {
+        LOGGER.warn("Schema for FLUME does not exist");
+        return false;
+      }
+
+      String flumeSchemaId = rset.getString(1);
+      LOGGER.debug("Flume schema ID: " + flumeSchemaId);
+
+      connection.commit();
+    } catch (SQLException ex) {
+      try {
+        connection.rollback();
+      } catch (SQLException ex2) {
+        LOGGER.error("Unable to rollback transaction", ex2);
+      }
+      throw new JdbcChannelException("Unable to query schema", ex);
+    } finally {
+      if (rset != null) {
+        try {
+          rset.close();
+        } catch (SQLException ex) {
+          LOGGER.error("Unable to close result set", ex);
+        }
+      }
+      if (connection != null) {
+        try {
+          connection.close();
+        } catch (SQLException ex) {
+          LOGGER.error("Unable to close connection", ex);
+        }
+      }
+    }
+
+    return true;
+  }
+
+  @Override
+  public void createSchemaObjects() {
+    runQuery(QUERY_CREATE_SCHEMA_FLUME);
+    runQuery(QUERY_CREATE_TABLE_FL_EVENT);
+    runQuery(QUERY_CREATE_TABLE_FL_PLEXT);
+    runQuery(QUERY_CREATE_TABLE_FL_HEADER);
+  }
+
+  @Override
+  public void validateSchema() {
+    verifyTableStructure(SCHEMA_FLUME, TABLE_FL_EVENT_NAME,
+        COLUMN_FLE_ID, COLUMN_FLE_PAYLOAD, COLUMN_FLE_CHANNEL);
+
+    verifyTableStructure(SCHEMA_FLUME, TABLE_FL_PLEXT_NAME,
+        COLUMN_FLP_EVENTID, COLUMN_FLP_SPILL);
+
+    verifyTableStructure(SCHEMA_FLUME, TABLE_FL_HEADER_NAME,
+        COLUMN_FLH_EVENTID, COLUMN_FLH_NAME, COLUMN_FLH_VALUE);
+  }
+
+  private void verifyTableStructure(String schemaName, String tableName,
+      String... columns) {
+    Set<String> columnNames = new HashSet<String>();
+    Connection connection = null;
+    PreparedStatement pStmt = null;
+    try {
+      connection = dataSource.getConnection();
+      pStmt = connection.prepareStatement(COLUMN_LOOKUP_QUERY);
+      pStmt.setString(1, tableName);
+      pStmt.setString(2, schemaName);
+      ResultSet rset = pStmt.executeQuery();
+
+      while (rset.next()) {
+        columnNames.add(rset.getString(1));
+      }
+      connection.commit();
+    } catch (SQLException ex) {
+      try {
+        connection.rollback();
+      } catch (SQLException ex2) {
+        LOGGER.error("Unable to rollback transaction", ex2);
+      }
+      throw new JdbcChannelException("Unable to run query: "
+          + COLUMN_LOOKUP_QUERY + ": 1=" + tableName + ", 2=" + schemaName, ex);
+    } finally {
+      if (pStmt != null) {
+        try {
+          pStmt.close();
+        } catch (SQLException ex) {
+          LOGGER.error("Unable to close statement", ex);
+        }
+        if (connection != null) {
+          try {
+            connection.close();
+          } catch (SQLException ex) {
+            LOGGER.error("Unable to close connection", ex);
+          }
+        }
+      }
+    }
+
+    Set<String> columnDiff = new HashSet<String>();
+    columnDiff.addAll(columnNames);
+
+    // Expected Column string form
+    StringBuilder sb = new StringBuilder("{");
+    boolean first = true;
+    for (String column : columns) {
+      columnDiff.remove(column);
+      if (first) {
+        first = false;
+      } else {
+        sb.append(", ");
+      }
+      sb.append(column);
+    }
+    sb.append("}");
+
+    String expectedColumns = sb.toString();
+
+    if (LOGGER.isDebugEnabled()) {
+      LOGGER.debug("Table " + schemaName + "." + tableName
+          + " expected columns: " + expectedColumns + ", actual columns: "
+          + columnNames);
+    }
+
+    if (columnNames.size() != columns.length || columnDiff.size() != 0) {
+      throw new JdbcChannelException("Expected table " + schemaName + "."
+          + tableName + " to have columns: " + expectedColumns + ". Instead "
+          + "found columns: " + columnNames);
+    }
+  }
+
+  private void runQuery(String query) {
+    Connection connection = null;
+    Statement stmt = null;
+    try {
+      connection = dataSource.getConnection();
+      stmt = connection.createStatement();
+      if (stmt.execute(query)) {
+        ResultSet rset = stmt.getResultSet();
+        int count = 0;
+        while (rset.next()) {
+          count++;
+        }
+        LOGGER.info("QUERY(" + query + ") produced unused resultset with "
+            + count + " rows");
+      } else {
+        int updateCount = stmt.getUpdateCount();
+        LOGGER.info("QUERY(" + query + ") Update count: " + updateCount);
+      }
+      connection.commit();
+    } catch (SQLException ex) {
+      try {
+        connection.rollback();
+      } catch (SQLException ex2) {
+        LOGGER.error("Unable to rollback transaction", ex2);
+      }
+      throw new JdbcChannelException("Unable to run query: "
+          + query, ex);
+    } finally {
+      if (stmt != null) {
+        try {
+          stmt.close();
+        } catch (SQLException ex) {
+          LOGGER.error("Unable to close statement", ex);
+        }
+        if (connection != null) {
+          try {
+            connection.close();
+          } catch (SQLException ex) {
+            LOGGER.error("Unable to close connection", ex);
+          }
+        }
+      }
+    }
+  }
+}

Added: incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/JdbcChannelProviderImpl.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/JdbcChannelProviderImpl.java?rev=1182209&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/JdbcChannelProviderImpl.java (added)
+++ incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/JdbcChannelProviderImpl.java Wed Oct 12 05:39:45 2011
@@ -0,0 +1,390 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flume.channel.jdbc.impl;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.util.Enumeration;
+import java.util.Properties;
+
+import javax.sql.DataSource;
+
+import org.apache.commons.dbcp.ConnectionFactory;
+import org.apache.commons.dbcp.DriverManagerConnectionFactory;
+import org.apache.commons.dbcp.PoolableConnectionFactory;
+import org.apache.commons.dbcp.PoolingDataSource;
+import org.apache.commons.pool.KeyedObjectPoolFactory;
+import org.apache.commons.pool.impl.GenericKeyedObjectPoolFactory;
+import org.apache.commons.pool.impl.GenericObjectPool;
+import org.apache.flume.Event;
+import org.apache.flume.Transaction;
+import org.apache.flume.channel.jdbc.ConfigurationConstants;
+import org.apache.flume.channel.jdbc.DatabaseType;
+import org.apache.flume.channel.jdbc.JdbcChannelException;
+import org.apache.flume.channel.jdbc.JdbcChannelProvider;
+import org.apache.flume.channel.jdbc.TransactionIsolation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class JdbcChannelProviderImpl implements JdbcChannelProvider {
+
+  private static final Logger LOGGER =
+      LoggerFactory.getLogger(JdbcChannelProviderImpl.class);
+
+
+  private static final String EMBEDDED_DERBY_DRIVER_CLASSNAME
+        = "org.apache.derby.jdbc.EmbeddedDriver";
+
+  private static final String DEFAULT_DRIVER_CLASSNAME
+         = EMBEDDED_DERBY_DRIVER_CLASSNAME;
+  private static final String DEFAULT_USERNAME = "sa";
+  private static final String DEFAULT_PASSWORD = "";
+  private static final String DEFAULT_DBTYPE = "DERBY";
+
+  /** The connection pool. */
+  private GenericObjectPool connectionPool;
+
+  /** The statement cache pool */
+  private KeyedObjectPoolFactory statementPool;
+
+  /** The data source. */
+  private DataSource dataSource;
+
+  /** The database type. */
+  private DatabaseType databaseType;
+
+  /** The schema handler. */
+  private SchemaHandler schemaHandler;
+
+  /** Transaction factory */
+  private JdbcTransactionFactory txFactory;
+
+  /** Connection URL */
+  private String connectUrl;
+
+  /** Driver Class Name */
+  private String driverClassName;
+
+  @Override
+  public void initialize(Properties properties) {
+    if (LOGGER.isDebugEnabled()) {
+      LOGGER.debug("Initializing JDBC Channel provider with props: "
+          + properties);
+    }
+
+    initializeDataSource(properties);
+    initializeSchema(properties);
+  }
+
+  private void initializeSchema(Properties properties) {
+    String createSchemaFlag = properties.getProperty(
+        ConfigurationConstants.CONFIG_CREATE_SCHEMA, "true");
+
+    boolean createSchema = Boolean.valueOf(createSchemaFlag);
+    LOGGER.debug("Create schema flag set to: " + createSchema);
+
+    // First check if the schema exists
+    schemaHandler = SchemaHandlerFactory.getHandler(databaseType, dataSource);
+
+    if (!schemaHandler.schemaExists()) {
+      if (!createSchema) {
+        throw new JdbcChannelException("Schema does not exist and "
+            + "auto-generation is disabled. Please enable auto-generation of "
+            + "schema and try again.");
+      }
+
+      // Now create schema
+      schemaHandler.createSchemaObjects();
+    }
+
+    // Validate all schema objects are as expected
+    schemaHandler.validateSchema();
+  }
+
+
+  @Override
+  public void close() {
+    try {
+      connectionPool.close();
+    } catch (Exception ex) {
+      throw new JdbcChannelException("Unable to close connection pool", ex);
+    }
+
+    if (databaseType.equals(DatabaseType.DERBY)
+        && driverClassName.equals(EMBEDDED_DERBY_DRIVER_CLASSNAME)) {
+      // Need to shut down the embedded Derby instance
+      if (connectUrl.startsWith("jdbc:derby:")) {
+        int index = connectUrl.indexOf(";");
+        String baseUrl = null;
+        if (index != -1) {
+          baseUrl = connectUrl.substring(0, index+1);
+        } else {
+          baseUrl = connectUrl + ";";
+        }
+        String shutDownUrl = baseUrl + "shutdown=true";
+
+        LOGGER.debug("Attempting to shutdown embedded Derby using URL: "
+            + shutDownUrl);
+
+        try {
+          DriverManager.getConnection(shutDownUrl);
+        } catch (SQLException ex) {
+          // Shutdown for one db instance is expected to raise SQL STATE 45000
+          if (ex.getErrorCode() != 45000) {
+            throw new JdbcChannelException(
+                "Unable to shutdown embedded Derby: " + shutDownUrl
+                + " Error Code: " + ex.getErrorCode(), ex);
+          }
+          LOGGER.info("Embedded Derby shutdown raised SQL STATE "
+              + "45000 as expected.");
+        }
+      } else {
+        LOGGER.warn("Even though embedded Derby drvier was loaded, the connect "
+            + "URL is of an unexpected form: " + connectUrl + ". Therfore no "
+            + "attempt will be made to shutdown embedded Derby instance.");
+      }
+    }
+
+    dataSource = null;
+    txFactory = null;
+    schemaHandler = null;
+  }
+
+  @Override
+  public void persistEvent(String channelName, Event event) {
+    // TODO Auto-generated method stub
+
+  }
+
+  @Override
+  public Event removeEvent(String channelName) {
+    // TODO Auto-generated method stub
+    return null;
+  }
+
+  @Override
+  public Transaction getTransaction() {
+    return txFactory.get();
+  }
+
+  /**
+
+   * Initializes the datasource and the underlying connection pool.
+   * @param properties
+   */
+  private void initializeDataSource(Properties properties) {
+    driverClassName = properties.getProperty(
+        ConfigurationConstants.CONFIG_JDBC_DRIVER_CLASS);
+
+    connectUrl = properties.getProperty(ConfigurationConstants.CONFIG_URL);
+
+
+    String userName =
+        properties.getProperty(ConfigurationConstants.CONFIG_USERNAME);
+
+    String password =
+        properties.getProperty(ConfigurationConstants.CONFIG_PASSWORD);
+
+    String jdbcPropertiesFile = properties.getProperty(
+        ConfigurationConstants.CONFIG_JDBC_PROPERTIES_FILE);
+
+    String dbTypeName = properties.getProperty(
+        ConfigurationConstants.CONFIG_DATABASE_TYPE);
+
+    // If connect URL is not specified, use embedded Derby
+    if (connectUrl == null || connectUrl.trim().length() == 0) {
+      LOGGER.warn("No connection URL specified. "
+          + "Using embedded derby database instance.");
+
+      driverClassName = DEFAULT_DRIVER_CLASSNAME;
+      userName = DEFAULT_USERNAME;
+      password = DEFAULT_PASSWORD;
+      dbTypeName = DEFAULT_DBTYPE;
+
+      String homePath = System.getProperty("user.home").replace('\\', '/');
+
+      String defaultDbDir = homePath + "/.flume/jdbc-channel";
+
+
+      File dbDir = new File(defaultDbDir);
+      String canonicalDbDirPath = null;
+
+      try {
+        canonicalDbDirPath = dbDir.getCanonicalPath();
+      } catch (IOException ex) {
+        throw new JdbcChannelException("Unable to find canonical path of dir: "
+            + defaultDbDir, ex);
+      }
+
+      if (!dbDir.exists()) {
+        if (!dbDir.mkdirs()) {
+          throw new JdbcChannelException("unable to create directory: "
+              + canonicalDbDirPath);
+        }
+      }
+
+      connectUrl = "jdbc:derby:" + canonicalDbDirPath + "/db;create=true";
+
+      // No jdbc properties file will be used
+      jdbcPropertiesFile = null;
+
+      LOGGER.warn("Overriding values for - driver: " + driverClassName
+          + ", user: " + userName + "connectUrl: " + connectUrl
+          + ", jdbc properties file: " + jdbcPropertiesFile
+          + ", dbtype: " + dbTypeName);
+    }
+
+    // Right now only Derby and MySQL supported
+    databaseType = DatabaseType.getByName(dbTypeName);
+
+    switch (databaseType) {
+    case DERBY:
+    case MYSQL:
+      break;
+    default:
+      throw new JdbcChannelException("Database " + databaseType
+          + " not supported at this time");
+    }
+
+    // Register driver
+    if (driverClassName == null || driverClassName.trim().length() == 0) {
+      throw new JdbcChannelException("No jdbc driver specified");
+    }
+
+    try {
+      Class.forName(driverClassName);
+    } catch (ClassNotFoundException ex) {
+      throw new JdbcChannelException("Unable to load driver: "
+                  + driverClassName, ex);
+    }
+
+    // JDBC Properties
+    Properties jdbcProps = new Properties();
+
+    if (jdbcPropertiesFile != null && jdbcPropertiesFile.trim().length() > 0) {
+      File jdbcPropsFile = new File(jdbcPropertiesFile.trim());
+      if (!jdbcPropsFile.exists()) {
+        throw new JdbcChannelException("Jdbc properties file does not exist: "
+            + jdbcPropertiesFile);
+      }
+
+      InputStream inStream = null;
+      try {
+        inStream = new FileInputStream(jdbcPropsFile);
+        jdbcProps.load(inStream);
+      } catch (IOException ex) {
+        throw new JdbcChannelException("Unable to load jdbc properties "
+            + "from file: " + jdbcPropertiesFile, ex);
+      } finally {
+        if (inStream != null) {
+          try {
+            inStream.close();
+          } catch (IOException ex) {
+            LOGGER.error("Unable to close file: " + jdbcPropertiesFile, ex);
+          }
+        }
+      }
+    }
+
+    if (userName != null) {
+      Object oldUser = jdbcProps.put("user", userName);
+      if (oldUser != null) {
+        LOGGER.warn("Overriding user from: " + oldUser + " to: " + userName);
+      }
+    }
+
+    if (password != null) {
+      Object oldPass = jdbcProps.put("password", password);
+      if (oldPass != null) {
+        LOGGER.warn("Overriding password from the jdbc properties with "
+            + " the one specified explicitly.");
+      }
+    }
+
+    if (LOGGER.isDebugEnabled()) {
+      StringBuilder sb = new StringBuilder("JDBC Properties {");
+      boolean first = true;
+      Enumeration<?> propertyKeys = jdbcProps.propertyNames();
+      while (propertyKeys.hasMoreElements()) {
+        if (first) {
+          first = false;
+        } else {
+          sb.append(", ");
+        }
+        String key = (String) propertyKeys.nextElement();
+        sb.append(key).append("=");
+        if (key.equalsIgnoreCase("password")) {
+          sb.append("*******");
+        } else {
+          sb.append(jdbcProps.get(key));
+        }
+      }
+
+      sb.append("}");
+
+      LOGGER.debug(sb.toString());
+    }
+
+    // Transaction Isolation
+    String txIsolation = properties.getProperty(
+        ConfigurationConstants.CONFIG_TX_ISOLATION_LEVEL,
+        TransactionIsolation.READ_COMMITTED.getName());
+
+    TransactionIsolation txIsolationLevel =
+        TransactionIsolation.getByName(txIsolation);
+
+    LOGGER.debug("Transaction isolation will be set to: " + txIsolationLevel);
+
+    // Setup Datasource
+    ConnectionFactory connFactory =
+        new DriverManagerConnectionFactory(connectUrl, jdbcProps);
+
+    connectionPool = new GenericObjectPool();
+
+    String maxActiveConnections = properties.getProperty(
+        ConfigurationConstants.CONFIG_MAX_CONNECTION, "10");
+
+    int maxActive = 10;
+    if (maxActiveConnections != null && maxActiveConnections.length() > 0) {
+      try {
+        maxActive = Integer.parseInt(maxActiveConnections);
+      } catch (NumberFormatException nfe) {
+        LOGGER.warn("Max active connections has invalid value: "
+                + maxActiveConnections + ", Using default: " + maxActive);
+      }
+    }
+
+    LOGGER.debug("Max active connections for the pool: " + maxActive);
+    connectionPool.setMaxActive(maxActive);
+
+    statementPool = new GenericKeyedObjectPoolFactory(null);
+
+    // Creating the factory instance automatically wires the connection pool
+    new PoolableConnectionFactory(connFactory, connectionPool, statementPool,
+        databaseType.getValidationQuery(), false, false,
+        txIsolationLevel.getCode());
+
+    dataSource = new PoolingDataSource(connectionPool);
+
+    txFactory = new JdbcTransactionFactory(dataSource);
+  }
+}

Added: incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/JdbcTransactionFactory.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/JdbcTransactionFactory.java?rev=1182209&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/JdbcTransactionFactory.java (added)
+++ incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/JdbcTransactionFactory.java Wed Oct 12 05:39:45 2011
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flume.channel.jdbc.impl;
+
+import java.sql.SQLException;
+
+import javax.sql.DataSource;
+
+import org.apache.flume.channel.jdbc.JdbcChannelException;
+
+public class JdbcTransactionFactory extends ThreadLocal<JdbcTransactionImpl> {
+
+  private final DataSource dataSource;
+
+  protected JdbcTransactionFactory(DataSource dataSource) {
+    super();
+    this.dataSource = dataSource;
+  }
+
+  @Override
+  protected JdbcTransactionImpl initialValue() {
+    try {
+      return new JdbcTransactionImpl(dataSource.getConnection(), this);
+    } catch (SQLException ex) {
+      throw new JdbcChannelException("Unable to create connection", ex);
+    }
+  }
+}

Added: incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/JdbcTransactionImpl.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/JdbcTransactionImpl.java?rev=1182209&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/JdbcTransactionImpl.java (added)
+++ incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/JdbcTransactionImpl.java Wed Oct 12 05:39:45 2011
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flume.channel.jdbc.impl;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.SQLWarning;
+
+import org.apache.flume.Transaction;
+import org.apache.flume.channel.jdbc.JdbcChannelException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class JdbcTransactionImpl implements Transaction {
+
+  private static final Logger LOGGER =
+      LoggerFactory.getLogger(JdbcTransactionImpl.class);
+
+  private Connection connection;
+  private JdbcTransactionFactory txFactory;
+  boolean active = true;
+  int count = 0;
+
+  boolean rollback = false;
+
+  protected JdbcTransactionImpl(Connection conn,
+      JdbcTransactionFactory factory) {
+    connection = conn;
+    txFactory = factory;
+
+    try {
+      connection.clearWarnings();
+    } catch (SQLException ex) {
+      LOGGER.error("Error while clearing warnings: " + ex.getErrorCode(), ex);
+    }
+  }
+
+  @Override
+  public void begin() {
+    if (!active) {
+      throw new JdbcChannelException("Inactive transaction");
+    }
+    count++;
+    LOGGER.debug("Tx count-begin: " + count + ", rollback: " + rollback);
+  }
+
+  @Override
+  public void commit() {
+    if (!active) {
+      throw new JdbcChannelException("Inactive transaction");
+    }
+    if (rollback) {
+      throw new JdbcChannelException(
+          "Cannot commit transaction marked for rollback");
+    }
+    LOGGER.debug("Tx count-commit: " + count + ", rollback: " + rollback);
+  }
+
+  @Override
+  public void rollback() {
+    if (!active) {
+      throw new JdbcChannelException("Inactive transaction");
+    }
+    rollback = true;
+    LOGGER.debug("Tx count-rollback: " + count + ", rollback: " + rollback);
+  }
+
+  @Override
+  public void close() {
+    if (!active) {
+      throw new JdbcChannelException("Inactive transaction");
+    }
+    count--;
+    LOGGER.debug("Tx count-close: " + count + ", rollback: " + rollback);
+    if (count == 0) {
+      active = false;
+      try {
+        if (rollback) {
+          LOGGER.info("Attempting transaction roll-back");
+          connection.rollback();
+        } else {
+          LOGGER.info("Attempting transaction commit");
+          connection.commit();
+        }
+      } catch (SQLException ex) {
+        throw new JdbcChannelException("Unable to finalize transaction", ex);
+      } finally {
+        if (connection != null) {
+          // Log Warnings
+          try {
+            SQLWarning warning = connection.getWarnings();
+            if (warning != null) {
+              StringBuilder sb = new StringBuilder("Connection warnigns: ");
+              boolean first = true;
+              while (warning != null) {
+                if (first) {
+                  first = false;
+                } else {
+                  sb.append("; ");
+                }
+                sb.append("[").append(warning.getErrorCode()).append("] ");
+                sb.append(warning.getMessage());
+              }
+              LOGGER.warn(sb.toString());
+            }
+          } catch (SQLException ex) {
+            LOGGER.error("Error while retrieving warnigns: "
+                                + ex.getErrorCode(), ex);
+          }
+
+          // Close Connection
+          try {
+            connection.close();
+          } catch (SQLException ex) {
+            LOGGER.error(
+                "Unable to close connection: " + ex.getErrorCode(), ex);
+          }
+        }
+
+        // Clean up thread local
+        txFactory.remove();
+
+        // Destroy local state
+        connection = null;
+        txFactory = null;
+      }
+    }
+  }
+}

Added: incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/MySQLSchemaHandler.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/MySQLSchemaHandler.java?rev=1182209&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/MySQLSchemaHandler.java (added)
+++ incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/MySQLSchemaHandler.java Wed Oct 12 05:39:45 2011
@@ -0,0 +1,31 @@
+package org.apache.flume.channel.jdbc.impl;
+
+import javax.sql.DataSource;
+
+public class MySQLSchemaHandler implements SchemaHandler {
+
+  private final DataSource dataSource;
+
+  protected MySQLSchemaHandler(DataSource dataSource) {
+    this.dataSource = dataSource;
+  }
+
+  @Override
+  public boolean schemaExists() {
+    // TODO Auto-generated method stub
+    return false;
+  }
+
+  @Override
+  public void validateSchema() {
+    // TODO Auto-generated method stub
+
+  }
+
+  @Override
+  public void createSchemaObjects() {
+    // TODO Auto-generated method stub
+
+  }
+
+}

Added: incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/SchemaHandler.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/SchemaHandler.java?rev=1182209&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/SchemaHandler.java (added)
+++ incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/SchemaHandler.java Wed Oct 12 05:39:45 2011
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flume.channel.jdbc.impl;
+
+import javax.sql.DataSource;
+
+/**
+ * <p>A handler for creating and validating database schema for use by
+ * the JDBC channel implementation.</p>
+ */
+public interface SchemaHandler {
+
+  /**
+   * @param connection the connection to check for schema.
+   * @return true if the schema exists. False otherwise.
+   */
+  public boolean schemaExists();
+
+  /**
+   * Validates the schema.
+   * @param connection
+   */
+  public void validateSchema();
+
+  /**
+   * Creates the schema.
+   * @param connection the connection to create schema for.
+   */
+  public void createSchemaObjects();
+}

Added: incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/SchemaHandlerFactory.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/SchemaHandlerFactory.java?rev=1182209&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/SchemaHandlerFactory.java (added)
+++ incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/SchemaHandlerFactory.java Wed Oct 12 05:39:45 2011
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flume.channel.jdbc.impl;
+
+import javax.sql.DataSource;
+
+import org.apache.flume.channel.jdbc.DatabaseType;
+import org.apache.flume.channel.jdbc.JdbcChannelException;
+
+/**
+ * <p>A factory for SchemaHandlers.</p>
+ */
+public final class SchemaHandlerFactory {
+
+  public static SchemaHandler getHandler(DatabaseType dbType,
+      DataSource dataSource) {
+    SchemaHandler handler = null;
+    switch(dbType) {
+    case DERBY:
+      handler = new DerbySchemaHandler(dataSource);
+      break;
+    case MYSQL:
+      handler = new MySQLSchemaHandler(dataSource);
+      break;
+    default:
+      throw new JdbcChannelException("Database " + dbType
+          + " not supported yet");
+    }
+
+    return handler;
+  }
+
+  private SchemaHandlerFactory() {
+    // Disable explicit object creation
+  }
+}

Added: incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/TestDatabaseTypeEnum.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/TestDatabaseTypeEnum.java?rev=1182209&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/TestDatabaseTypeEnum.java (added)
+++ incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/TestDatabaseTypeEnum.java Wed Oct 12 05:39:45 2011
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flume.channel.jdbc;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * The purpose of this test is to guard against accidental backward
+ * compatibility problem since the string representation so of DatabaseType enum
+ * are a public interface used in configuration.
+ */
+public class TestDatabaseTypeEnum {
+
+  public static final String DBTYPE_OTHER = "OTHER";
+  public static final String DBTYPE_DERBY = "DERBY";
+  public static final String DBTYPE_MYSQL = "MYSQL";
+  public static final String DBTYPE_PGSQL = "POSTGRESQL";
+  public static final String DBTYPE_ORACLE = "ORACLE";
+
+  private Map<String, DatabaseType> enumMap =
+      new HashMap<String, DatabaseType>();
+
+  @Before
+  public void setUp() {
+    enumMap.clear();
+    enumMap.put(DBTYPE_OTHER, DatabaseType.OTHER);
+    enumMap.put(DBTYPE_DERBY, DatabaseType.DERBY);
+    enumMap.put(DBTYPE_MYSQL, DatabaseType.MYSQL);
+    enumMap.put(DBTYPE_PGSQL, DatabaseType.POSTGRESQL);
+    enumMap.put(DBTYPE_ORACLE, DatabaseType.ORACLE);
+  }
+
+  @Test
+  public void testDatabaseTypeLookup() {
+    for (String key : enumMap.keySet()) {
+      DatabaseType type = enumMap.get(key);
+      DatabaseType lookupType = DatabaseType.valueOf(key);
+      String lookupTypeName = lookupType.getName();
+
+      Assert.assertEquals(lookupTypeName, lookupType.toString());
+      Assert.assertSame(type, lookupType);
+      Assert.assertEquals(key, lookupTypeName);
+
+      DatabaseType lookupType2 = DatabaseType.getByName(key.toLowerCase());
+      Assert.assertSame(type, lookupType2);
+    }
+  }
+
+  @Test
+  public void testUnknonwnDatabaseTypeLookup() {
+    String[] invalidTypes = new String[] { "foo", "bar", "abcd" };
+
+    for (String key : invalidTypes) {
+      DatabaseType type = DatabaseType.getByName(key);
+
+      Assert.assertSame(type, DatabaseType.OTHER);
+    }
+  }
+}

Added: incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/TestDerbySchemaHandlerQueries.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/TestDerbySchemaHandlerQueries.java?rev=1182209&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/TestDerbySchemaHandlerQueries.java (added)
+++ incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/TestDerbySchemaHandlerQueries.java Wed Oct 12 05:39:45 2011
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flume.channel.jdbc;
+
+import org.apache.flume.channel.jdbc.impl.DerbySchemaHandler;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestDerbySchemaHandlerQueries {
+
+  public static final String EXPECTED_QUERY_CREATE_SCHEMA_FLUME
+       = "CREATE SCHEMA FLUME";
+
+  public static final String EXPECTED_QUERY_CREATE_TABLE_FL_EVENT
+       = "CREATE TABLE FLUME.FL_EVENT ( FLE_ID BIGINT GENERATED ALWAYS AS "
+           + "IDENTITY (START WITH 2, INCREMENT BY 1) PRIMARY KEY, FLE_PAYLOAD "
+           + "VARCHAR(16384) FOR BIT DATA, FLE_CHANNEL VARCHAR(32))";
+
+  public static final String EXPECTED_QUERY_CREATE_TABLE_FL_PLEXT
+       = "CREATE TABLE FLUME.FL_PLEXT ( FLP_EVENTID BIGINT, FLP_SPILL BLOB, "
+           + "FOREIGN KEY (FLP_EVENTID) REFERENCES FLUME.FL_EVENT (FLE_ID))";
+
+  public static final String EXPECTED_QUERY_CREATE_TABLE_FL_HEADER
+       = "CREATE TABLE FLUME.FL_HEADER ( FLH_EVENTID BIGINT, FLH_NAME "
+           + "VARCHAR(255), FLH_VALUE VARCHAR(255), FOREIGN KEY (FLH_EVENTID) "
+           + "REFERENCES FLUME.FL_EVENT (FLE_ID))";
+
+  @Test
+  public void testCreateQueries() {
+
+    Assert.assertEquals(DerbySchemaHandler.QUERY_CREATE_SCHEMA_FLUME,
+        EXPECTED_QUERY_CREATE_SCHEMA_FLUME);
+
+    Assert.assertEquals(DerbySchemaHandler.QUERY_CREATE_TABLE_FL_EVENT,
+        EXPECTED_QUERY_CREATE_TABLE_FL_EVENT);
+
+    Assert.assertEquals(DerbySchemaHandler.QUERY_CREATE_TABLE_FL_PLEXT,
+        EXPECTED_QUERY_CREATE_TABLE_FL_PLEXT);
+
+    Assert.assertEquals(DerbySchemaHandler.QUERY_CREATE_TABLE_FL_HEADER,
+        EXPECTED_QUERY_CREATE_TABLE_FL_HEADER);
+
+  }
+
+}

Added: incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/TestJdbcChannelProvider.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/TestJdbcChannelProvider.java?rev=1182209&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/TestJdbcChannelProvider.java (added)
+++ incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/TestJdbcChannelProvider.java Wed Oct 12 05:39:45 2011
@@ -0,0 +1,67 @@
+package org.apache.flume.channel.jdbc;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Properties;
+
+import org.apache.flume.Transaction;
+import org.apache.flume.channel.jdbc.impl.JdbcChannelProviderImpl;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TestJdbcChannelProvider {
+
+  private static final Logger LOGGER =
+      LoggerFactory.getLogger(TestJdbcChannelProvider.class);
+
+  private Properties derbyProps = new Properties();
+  private File derbyDbDir;
+
+  @Before
+  public void setUp() throws IOException {
+    derbyProps.clear();
+    derbyProps.put(ConfigurationConstants.CONFIG_CREATE_SCHEMA, "true");
+    derbyProps.put(ConfigurationConstants.CONFIG_DATABASE_TYPE, "DERBY");
+    derbyProps.put(ConfigurationConstants.CONFIG_JDBC_DRIVER_CLASS,
+        "org.apache.derby.jdbc.EmbeddedDriver");
+
+    derbyProps.put(ConfigurationConstants.CONFIG_PASSWORD, "");
+    derbyProps.put(ConfigurationConstants.CONFIG_USERNAME, "sa");
+
+    File tmpDir = new File("target/test");
+    tmpDir.mkdirs();
+
+    // Use a temp file to create a temporary directory
+    File tempFile = File.createTempFile("temp", "_db", tmpDir);
+    String absFileName = tempFile.getCanonicalPath();
+    tempFile.delete();
+
+    derbyDbDir = new File(absFileName + "_dir");
+
+    if (!derbyDbDir.exists()) {
+      derbyDbDir.mkdirs();
+    }
+
+    derbyProps.put(ConfigurationConstants.CONFIG_URL,
+        "jdbc:derby:" + derbyDbDir.getCanonicalPath() + "/db;create=true");
+
+    LOGGER.info("Derby Properties: " + derbyProps);
+  }
+
+  @Test
+  public void testDerbySetup() {
+    JdbcChannelProviderImpl jdbcProviderImpl =
+        new JdbcChannelProviderImpl();
+
+    jdbcProviderImpl.initialize(derbyProps);
+
+    Transaction tx = jdbcProviderImpl.getTransaction();
+    tx.begin();
+    tx.begin();
+    tx.close();
+    tx.close();
+    jdbcProviderImpl.close();
+  }
+}

Added: incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/TestTransactionIsolationLevelEnum.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/TestTransactionIsolationLevelEnum.java?rev=1182209&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/TestTransactionIsolationLevelEnum.java (added)
+++ incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/TestTransactionIsolationLevelEnum.java Wed Oct 12 05:39:45 2011
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flume.channel.jdbc;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+
+/**
+ * The purpose of this test is to guard against accidental backward
+ * compatibility problem since the string representation so of
+ * TransactionIsolation enum are a public interface used in configuration.
+ */
+public class TestTransactionIsolationLevelEnum {
+
+  public static final String TX_READ_UNCOMMITTED = "READ_UNCOMMITTED";
+  public static final String TX_READ_COMMITTED = "READ_COMMITTED";
+  public static final String TX_REPEATABLE_READ = "REPEATABLE_READ";
+  public static final String TX_SERIALIZABLE = "SERIALIZABLE";
+
+  private Map<String, TransactionIsolation> enumMap =
+      new HashMap<String, TransactionIsolation>();
+
+  @Before
+  public void setUp() {
+    enumMap.clear();
+    enumMap.put(TX_READ_UNCOMMITTED, TransactionIsolation.READ_UNCOMMITTED);
+    enumMap.put(TX_READ_COMMITTED, TransactionIsolation.READ_COMMITTED);
+    enumMap.put(TX_REPEATABLE_READ, TransactionIsolation.REPEATABLE_READ);
+    enumMap.put(TX_SERIALIZABLE, TransactionIsolation.SERIALIZABLE);
+  }
+
+  @Test
+  public void testReverseLookup() {
+    for (String key : enumMap.keySet()) {
+      TransactionIsolation txIsolation = enumMap.get(key);
+      TransactionIsolation lookupTxIsolation =
+          TransactionIsolation.valueOf(key);
+      String lookupTxIsolationName = lookupTxIsolation.getName();
+
+      Assert.assertEquals(lookupTxIsolationName, lookupTxIsolation.toString());
+      Assert.assertSame(txIsolation, lookupTxIsolation);
+      Assert.assertEquals(key, lookupTxIsolationName);
+
+      TransactionIsolation lookupTxIsolation2 =
+          TransactionIsolation.getByName(key.toLowerCase());
+      Assert.assertSame(txIsolation, lookupTxIsolation2);
+    }
+  }
+}

Modified: incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/conf/properties/FlumeConfiguration.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/conf/properties/FlumeConfiguration.java?rev=1182209&r1=1182208&r2=1182209&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/conf/properties/FlumeConfiguration.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/conf/properties/FlumeConfiguration.java Wed Oct 12 05:39:45 2011
@@ -166,7 +166,7 @@ public class FlumeConfiguration {
     private final Map<String, ComponentConfiguration> channelConfigMap;
 
 
-    public AgentConfiguration(String agentName) {
+    private AgentConfiguration(String agentName) {
       this.agentName = agentName;
       sourceConfigMap = new HashMap<String, ComponentConfiguration>();
       sinkConfigMap = new HashMap<String, ComponentConfiguration>();
@@ -524,7 +524,7 @@ public class FlumeConfiguration {
 
      private final Map<String, String> configuration;
 
-     public ComponentConfiguration(String componentName, boolean hasRunner) {
+     private ComponentConfiguration(String componentName, boolean hasRunner) {
        this.componentName = componentName;
        this.hasRunner = hasRunner;
        if (hasRunner) {
@@ -607,7 +607,7 @@ public class FlumeConfiguration {
     private final String componentName;
     private final String configKey;
 
-    public ComponentNameAndConfigKey(String name, String configKey) {
+    private ComponentNameAndConfigKey(String name, String configKey) {
       this.componentName = name;
       this.configKey = configKey;
     }

Modified: incubator/flume/branches/flume-728/pom.xml
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/pom.xml?rev=1182209&r1=1182208&r2=1182209&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/pom.xml (original)
+++ incubator/flume/branches/flume-728/pom.xml Wed Oct 12 05:39:45 2011
@@ -628,6 +628,18 @@ limitations under the License.
         <version>2.0.1</version>
       </dependency>
 
+      <dependency>
+        <groupId>commons-dbcp</groupId>
+        <artifactId>commons-dbcp</artifactId>
+        <version>1.4</version>
+      </dependency>
+
+     <dependency>
+       <groupId>org.apache.derby</groupId>
+       <artifactId>derby</artifactId>
+       <version>10.8.1.2</version>
+     </dependency>
+
     </dependencies>
   </dependencyManagement>
 



Mime
View raw message