Return-Path: X-Original-To: apmail-incubator-flume-commits-archive@minotaur.apache.org Delivered-To: apmail-incubator-flume-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 22BF37B98 for ; Fri, 14 Oct 2011 09:14:11 +0000 (UTC) Received: (qmail 25509 invoked by uid 500); 14 Oct 2011 09:14:11 -0000 Delivered-To: apmail-incubator-flume-commits-archive@incubator.apache.org Received: (qmail 25475 invoked by uid 500); 14 Oct 2011 09:14:10 -0000 Mailing-List: contact flume-commits-help@incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: flume-dev@incubator.apache.org Delivered-To: mailing list flume-commits@incubator.apache.org Received: (qmail 25464 invoked by uid 99); 14 Oct 2011 09:14:09 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 14 Oct 2011 09:14:09 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 14 Oct 2011 09:14:02 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 7CD6F23888CD; Fri, 14 Oct 2011 09:13:40 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1183252 - in /incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel: ./ src/main/java/org/apache/flume/channel/jdbc/impl/ src/test/java/org/apache/flume/channel/jdbc/ Date: Fri, 14 Oct 2011 09:13:39 -0000 To: flume-commits@incubator.apache.org From: arvind@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20111014091340.7CD6F23888CD@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: arvind Date: Fri Oct 14 09:13:39 2011 New Revision: 1183252 URL: http://svn.apache.org/viewvc?rev=1183252&view=rev Log: Implemented persistence of events for derby. Added: incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/MockEvent.java incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/MockEventUtils.java Modified: incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/ (props changed) incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/DerbySchemaHandler.java incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/JdbcChannelProviderImpl.java incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/JdbcTransactionImpl.java incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/MySQLSchemaHandler.java incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/PersistableEvent.java incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/SchemaHandler.java incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/TestJdbcChannelProvider.java incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/TestPersistentEvent.java Propchange: incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/ ------------------------------------------------------------------------------ --- svn:ignore (original) +++ svn:ignore Fri Oct 14 09:13:39 2011 @@ -2,3 +2,4 @@ .project .settings target +derby.log Modified: incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/DerbySchemaHandler.java URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/DerbySchemaHandler.java?rev=1183252&r1=1183251&r2=1183252&view=diff ============================================================================== --- incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/DerbySchemaHandler.java (original) +++ incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/DerbySchemaHandler.java Fri Oct 14 09:13:39 2011 @@ -17,18 +17,24 @@ */ package org.apache.flume.channel.jdbc.impl; +import java.io.ByteArrayInputStream; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; +import java.util.ArrayList; import java.util.HashSet; +import java.util.Iterator; +import java.util.List; import java.util.Set; import javax.sql.DataSource; import org.apache.flume.channel.jdbc.ConfigurationConstants; import org.apache.flume.channel.jdbc.JdbcChannelException; +import org.apache.flume.channel.jdbc.impl.PersistableEvent.HeaderEntry; +import org.apache.flume.channel.jdbc.impl.PersistableEvent.SpillableString; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -209,12 +215,35 @@ public class DerbySchemaHandler implemen + "FOREIGN KEY (" + COLUMN_FLV_HEADER + ") REFERENCES " + TABLE_FL_HEADER + " (" + COLUMN_FLH_ID + "))"; - public static final String COLUMN_LOOKUP_QUERY = - "SELECT COLUMNNAME from SYS.SYSCOLUMNS where REFERENCEID = " + public static final String COLUMN_LOOKUP_QUERY + = "SELECT COLUMNNAME from SYS.SYSCOLUMNS where REFERENCEID = " + "(SELECT TABLEID FROM SYS.SYSTABLES WHERE TABLENAME = ? AND " + "SCHEMAID = (SELECT SCHEMAID FROM SYS.SYSSCHEMAS WHERE " + "SCHEMANAME = ? ))"; + public static final String STMT_INSERT_EVENT_BASE + = "INSERT INTO " + TABLE_FL_EVENT + " (" + + COLUMN_FLE_PAYLOAD + ", " + COLUMN_FLE_CHANNEL + ", " + + COLUMN_FLE_SPILL + ") VALUES ( ?, ?, ?)"; + + public static final String STMT_INSERT_EVENT_SPILL + = "INSERT INTO " + TABLE_FL_PLSPILL + " (" + + COLUMN_FLP_EVENT + ", " + COLUMN_FLP_SPILL + ") VALUES ( ?, ?)"; + + public static final String STMT_INSERT_HEADER_BASE + = "INSERT INTO " + TABLE_FL_HEADER + " (" + + COLUMN_FLH_EVENT + ", " + COLUMN_FLH_NAME + ", " + COLUMN_FLH_VALUE + + ", " + COLUMN_FLH_NMSPILL + ", " + COLUMN_FLH_VLSPILL + ") VALUES " + + "( ?, ?, ?, ?, ?)"; + + public static final String STMT_INSERT_HEADER_NAME_SPILL + = "INSERT INTO " + TABLE_FL_NMSPILL + " (" + + COLUMN_FLN_HEADER + ", " + COLUMN_FLN_SPILL + ") VALUES ( ?, ?)"; + + public static final String STMT_INSERT_HEADER_VALUE_SPILL + = "INSERT INTO " + TABLE_FL_VLSPILL + " (" + + COLUMN_FLV_HEADER + ", " + COLUMN_FLV_SPILL + ") VALUES ( ?, ?)"; + private final DataSource dataSource; protected DerbySchemaHandler(DataSource dataSource) { @@ -412,4 +441,201 @@ public class DerbySchemaHandler implemen } } } + + @Override + public void persistEvent(PersistableEvent pe, Connection connection) { + // First populate the main event table + byte[] basePayload = pe.getBasePayload(); + byte[] spillPayload = pe.getSpillPayload(); + boolean hasSpillPayload = (spillPayload != null); + String channelName = pe.getChannelName(); + + LOGGER.debug("Preparing insert event: " + pe); + + PreparedStatement baseEventStmt = null; + PreparedStatement spillEventStmt = null; + PreparedStatement baseHeaderStmt = null; + PreparedStatement headerNameSpillStmt = null; + PreparedStatement headerValueSpillStmt = null; + try { + baseEventStmt = connection.prepareStatement(STMT_INSERT_EVENT_BASE, + Statement.RETURN_GENERATED_KEYS); + baseEventStmt.setBytes(1, basePayload); + baseEventStmt.setString(2, channelName); + baseEventStmt.setBoolean(3, hasSpillPayload); + + int baseEventCount = baseEventStmt.executeUpdate(); + if (baseEventCount != 1) { + throw new JdbcChannelException("Invalid update count on base " + + "event insert: " + baseEventCount); + } + // Extract event ID and set it + ResultSet eventIdResult = baseEventStmt.getGeneratedKeys(); + + if (!eventIdResult.next()) { + throw new JdbcChannelException("Unable to retrieive inserted event-id"); + } + + long eventId = eventIdResult.getLong(1); + pe.setEventId(eventId); + + // Persist the payload spill + if (hasSpillPayload) { + spillEventStmt = connection.prepareStatement(STMT_INSERT_EVENT_SPILL); + spillEventStmt.setLong(1, eventId); + spillEventStmt.setBinaryStream(2, + new ByteArrayInputStream(spillPayload), spillPayload.length); + int spillEventCount = spillEventStmt.executeUpdate(); + if (spillEventCount != 1) { + throw new JdbcChannelException("Invalid update count on spill " + + "event insert: " + spillEventCount); + } + } + + // Persist the headers + List headers = pe.getHeaderEntries(); + if (headers != null && headers.size() > 0) { + List headerWithNameSpill = new ArrayList(); + List headerWithValueSpill = new ArrayList(); + + + baseHeaderStmt = connection.prepareStatement(STMT_INSERT_HEADER_BASE, + Statement.RETURN_GENERATED_KEYS); + Iterator it = headers.iterator(); + while (it.hasNext()) { + HeaderEntry entry = it.next(); + SpillableString name = entry.getName(); + SpillableString value = entry.getValue(); + baseHeaderStmt.setLong(1, eventId); + baseHeaderStmt.setString(2, name.getBase()); + baseHeaderStmt.setString(3, value.getBase()); + baseHeaderStmt.setBoolean(4, name.hasSpill()); + baseHeaderStmt.setBoolean(5, value.hasSpill()); + + int updateCount = baseHeaderStmt.executeUpdate(); + if (updateCount != 1) { + throw new JdbcChannelException("Unexpected update header count: " + + updateCount); + } + ResultSet headerIdResultSet = baseHeaderStmt.getGeneratedKeys(); + if (!headerIdResultSet.next()) { + throw new JdbcChannelException( + "Unable to retrieve inserted header id"); + } + long headerId = headerIdResultSet.getLong(1); + entry.setId(headerId); + + if (name.hasSpill()) { + headerWithNameSpill.add(entry); + } + + if (value.hasSpill()) { + headerWithValueSpill.add(entry); + } + } + + // Persist header name spills + if (headerWithNameSpill.size() > 0) { + LOGGER.debug("Number of headers with name spill: " + + headerWithNameSpill.size()); + + headerNameSpillStmt = + connection.prepareStatement(STMT_INSERT_HEADER_NAME_SPILL); + + for (HeaderEntry entry : headerWithNameSpill) { + String nameSpill = entry.getName().getSpill(); + + headerNameSpillStmt.setLong(1, entry.getId()); + headerNameSpillStmt.setString(2, nameSpill); + headerNameSpillStmt.addBatch(); + } + + int[] nameSpillUpdateCount = headerNameSpillStmt.executeBatch(); + if (nameSpillUpdateCount.length != headerWithNameSpill.size()) { + throw new JdbcChannelException("Unexpected update count for header " + + "name spills: expected " + headerWithNameSpill.size() + ", " + + "found " + nameSpillUpdateCount.length); + } + + for (int i = 0; i < nameSpillUpdateCount.length; i++) { + if (nameSpillUpdateCount[i] != 1) { + throw new JdbcChannelException("Unexpected update count for " + + "header name spill at position " + i + ", value: " + + nameSpillUpdateCount[i]); + } + } + } + + // Persist header value spills + if (headerWithValueSpill.size() > 0) { + LOGGER.debug("Number of headers with value spill: " + + headerWithValueSpill.size()); + + headerValueSpillStmt = + connection.prepareStatement(STMT_INSERT_HEADER_VALUE_SPILL); + + for(HeaderEntry entry : headerWithValueSpill) { + String valueSpill = entry.getValue().getSpill(); + + headerValueSpillStmt.setLong(1, entry.getId()); + headerValueSpillStmt.setString(2, valueSpill); + headerValueSpillStmt.addBatch(); + } + + int[] valueSpillUpdateCount = headerValueSpillStmt.executeBatch(); + if (valueSpillUpdateCount.length != headerWithValueSpill.size()) { + throw new JdbcChannelException("Unexpected update count for header " + + "value spills: expected " + headerWithValueSpill.size() + ", " + + "found " + valueSpillUpdateCount.length); + } + + for (int i = 0; i < valueSpillUpdateCount.length; i++) { + if (valueSpillUpdateCount[i] != 1) { + throw new JdbcChannelException("Unexpected update count for " + + "header value spill at position " + i + ", value: " + + valueSpillUpdateCount[i]); + } + } + } + } + } catch (SQLException ex) { + throw new JdbcChannelException("Unable to persist event: " + pe, ex); + } finally { + if (baseEventStmt != null) { + try { + baseEventStmt.close(); + } catch (SQLException ex) { + LOGGER.error("Unable to close base event statement", ex); + } + } + if (spillEventStmt != null) { + try { + spillEventStmt.close(); + } catch (SQLException ex) { + LOGGER.error("Unable to close spill event statement", ex); + } + } + if (baseHeaderStmt != null) { + try { + baseHeaderStmt.close(); + } catch (SQLException ex) { + LOGGER.error("Unable to close base header statement", ex); + } + } + if (headerNameSpillStmt != null) { + try { + headerNameSpillStmt.close(); + } catch (SQLException ex) { + LOGGER.error("Unable to close header name spill statement", ex); + } + } + if (headerValueSpillStmt != null) { + try { + headerValueSpillStmt.close(); + } catch (SQLException ex) { + LOGGER.error("Unable to close header value spill statement", ex); + } + } + } + } } Modified: incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/JdbcChannelProviderImpl.java URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/JdbcChannelProviderImpl.java?rev=1183252&r1=1183251&r2=1183252&view=diff ============================================================================== --- incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/JdbcChannelProviderImpl.java (original) +++ incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/JdbcChannelProviderImpl.java Fri Oct 14 09:13:39 2011 @@ -170,14 +170,15 @@ public class JdbcChannelProviderImpl imp } @Override - public void persistEvent(String channelName, Event event) { - PersistableEvent persistableEvent = new PersistableEvent(event); - Transaction tx = null; + public void persistEvent(String channel, Event event) { + PersistableEvent persistableEvent = new PersistableEvent(channel, event); + JdbcTransactionImpl tx = null; try { tx = getTransaction(); tx.begin(); // Persist the persistableEvent + schemaHandler.persistEvent(persistableEvent, tx.getConnection()); tx.commit(); } catch (Exception ex) { @@ -197,7 +198,7 @@ public class JdbcChannelProviderImpl imp } @Override - public Transaction getTransaction() { + public JdbcTransactionImpl getTransaction() { return txFactory.get(); } Modified: incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/JdbcTransactionImpl.java URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/JdbcTransactionImpl.java?rev=1183252&r1=1183251&r2=1183252&view=diff ============================================================================== --- incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/JdbcTransactionImpl.java (original) +++ incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/JdbcTransactionImpl.java Fri Oct 14 09:13:39 2011 @@ -154,4 +154,11 @@ public class JdbcTransactionImpl impleme } } } + + protected Connection getConnection() { + if (!active) { + throw new JdbcChannelException("Inactive transaction"); + } + return connection; + } } Modified: incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/MySQLSchemaHandler.java URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/MySQLSchemaHandler.java?rev=1183252&r1=1183251&r2=1183252&view=diff ============================================================================== --- incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/MySQLSchemaHandler.java (original) +++ incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/MySQLSchemaHandler.java Fri Oct 14 09:13:39 2011 @@ -1,5 +1,7 @@ package org.apache.flume.channel.jdbc.impl; +import java.sql.Connection; + import javax.sql.DataSource; public class MySQLSchemaHandler implements SchemaHandler { @@ -28,4 +30,10 @@ public class MySQLSchemaHandler implemen } + @Override + public void persistEvent(PersistableEvent pe, Connection connection) { + // TODO Auto-generated method stub + + } + } Modified: incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/PersistableEvent.java URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/PersistableEvent.java?rev=1183252&r1=1183251&r2=1183252&view=diff ============================================================================== --- incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/PersistableEvent.java (original) +++ incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/PersistableEvent.java Fri Oct 14 09:13:39 2011 @@ -29,20 +29,22 @@ import org.apache.flume.channel.jdbc.Con public class PersistableEvent { private long eventId; - private byte[] payload; - private byte[] spill; + private final String channel; + private byte[] basePayload; + private byte[] spillPayload; private List headers; - public PersistableEvent(Event event) { + public PersistableEvent(String channel, Event event) { + this.channel = channel; byte[] givenPayload = event.getBody(); if (givenPayload.length < ConfigurationConstants.PAYLOAD_LENGTH_THRESHOLD) { - payload = Arrays.copyOf(givenPayload, givenPayload.length); - spill = null; + basePayload = Arrays.copyOf(givenPayload, givenPayload.length); + spillPayload = null; } else { - payload = Arrays.copyOfRange(givenPayload, 0, + basePayload = Arrays.copyOfRange(givenPayload, 0, ConfigurationConstants.PAYLOAD_LENGTH_THRESHOLD); - spill = Arrays.copyOfRange(givenPayload, + spillPayload = Arrays.copyOfRange(givenPayload, ConfigurationConstants.PAYLOAD_LENGTH_THRESHOLD, givenPayload.length); } @@ -59,12 +61,13 @@ public class PersistableEvent { public byte[] getPayload() { byte[] result = null; - if (spill == null) { - result = Arrays.copyOf(payload, payload.length); + if (spillPayload == null) { + result = Arrays.copyOf(basePayload, basePayload.length); } else { - result = new byte[payload.length + spill.length]; - System.arraycopy(payload, 0, result, 0, payload.length); - System.arraycopy(spill, 0, result, payload.length, spill.length); + result = new byte[basePayload.length + spillPayload.length]; + System.arraycopy(basePayload, 0, result, 0, basePayload.length); + System.arraycopy(spillPayload, 0, result, + basePayload.length, spillPayload.length); } return result; @@ -75,35 +78,73 @@ public class PersistableEvent { if (headers != null) { headerMap = new HashMap(); for (HeaderEntry entry : headers) { - headerMap.put(entry.getName(), entry.getValue()); + headerMap.put(entry.getNameString(), entry.getValueString()); } } return headerMap; } - public static class HeaderEntry { + public String getChannelName() { + return channel; + } + + public byte[] getBasePayload() { + return this.basePayload; + } - private SpillableString nameString; - private SpillableString valueString; + public byte[] getSpillPayload() { + return this.spillPayload; + } + + protected void setEventId(long eventId) { + this.eventId = eventId; + } + + public List getHeaderEntries() { + return headers; + } + + protected static class HeaderEntry { + + private long headerId = -1L; + private SpillableString name; + private SpillableString value; public HeaderEntry(String name, String value) { - nameString = new SpillableString(name, + this.name = new SpillableString(name, ConfigurationConstants.HEADER_NAME_LENGTH_THRESHOLD); - valueString = new SpillableString(value, + this.value = new SpillableString(value, ConfigurationConstants.HEADER_VALUE_LENGTH_THRESHOLD); } - public String getName() { - return nameString.getString(); + public String getNameString() { + return name.getString(); } - public String getValue() { - return valueString.getString(); + public SpillableString getName() { + return name; } + + public String getValueString() { + return value.getString(); + } + + public SpillableString getValue() { + return value; + } + + protected void setId(long headerId) { + this.headerId = headerId; + } + + public long getId() { + return headerId; + } + } - private static class SpillableString { + protected static class SpillableString { private String base; private String spill; @@ -144,5 +185,9 @@ public class PersistableEvent { } return base + spill; } + + public boolean hasSpill() { + return spill != null; + } } } Modified: incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/SchemaHandler.java URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/SchemaHandler.java?rev=1183252&r1=1183251&r2=1183252&view=diff ============================================================================== --- incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/SchemaHandler.java (original) +++ incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/SchemaHandler.java Fri Oct 14 09:13:39 2011 @@ -17,7 +17,7 @@ */ package org.apache.flume.channel.jdbc.impl; -import javax.sql.DataSource; +import java.sql.Connection; /** *

A handler for creating and validating database schema for use by @@ -42,4 +42,14 @@ public interface SchemaHandler { * @param connection the connection to create schema for. */ public void createSchemaObjects(); + + /** + * Inserts the given persistent event into the database. The connection that + * is passed into the handler has an ongoing transaction and therefore the + * SchemaHandler implementation must not close the connection. + * + * @param pe the event to persist + * @param connection the connection to use + */ + public void persistEvent(PersistableEvent pe, Connection connection); } Added: incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/MockEvent.java URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/MockEvent.java?rev=1183252&view=auto ============================================================================== --- incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/MockEvent.java (added) +++ incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/MockEvent.java Fri Oct 14 09:13:39 2011 @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flume.channel.jdbc; + +import java.util.Map; + +import org.apache.flume.Event; + +public class MockEvent implements Event { + + private final byte[] payload; + private final Map headers; + + public MockEvent(byte[] payload, Map headers) { + this.payload = payload; + this.headers = headers; + } + + @Override + public Map getHeaders() { + return headers; + } + + @Override + public void setHeaders(Map headers) { + + } + + @Override + public byte[] getBody() { + return payload; + } + + @Override + public void setBody(byte[] body) { + + } + +} Added: incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/MockEventUtils.java URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/MockEventUtils.java?rev=1183252&view=auto ============================================================================== --- incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/MockEventUtils.java (added) +++ incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/MockEventUtils.java Fri Oct 14 09:13:39 2011 @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flume.channel.jdbc; + +import java.util.Random; + +public final class MockEventUtils { + + private static final Random RANDOM = new Random(System.currentTimeMillis()); + + private static final String[] CHARS = new String[] { + "a","b","c","d","e","f","g","h","i","j","k","l","m","n","o","p","q","r", + "s","t","u","v","w","x","y","z", + "A","B","C","D","E","F","G","H","I","J","K","L","M","N","O","P","Q","R", + "S","T","U","V","W","X","Y","Z", + "0","1","2","3","4","5","6","7","8","9", + "!","@","#","$","%","^","&","*","(",")", + "[","]","{","}",":",";","\"","'",",",".","<",">","?","/","\\","|", + }; + + public static byte[] generatePayload(int size) { + byte[] result = new byte[size]; + RANDOM.nextBytes(result); + return result; + } + + public static String generateHeaderString(int size) { + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < size; i++) { + int x = Math.abs(RANDOM.nextInt()); + int y = x % CHARS.length; + sb.append(CHARS[y]); + } + return sb.toString(); + } + + private MockEventUtils() { + // Disable explicit object creation + } + + +} Modified: incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/TestJdbcChannelProvider.java URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/TestJdbcChannelProvider.java?rev=1183252&r1=1183251&r2=1183252&view=diff ============================================================================== --- incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/TestJdbcChannelProvider.java (original) +++ incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/TestJdbcChannelProvider.java Fri Oct 14 09:13:39 2011 @@ -1,11 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.flume.channel.jdbc; import java.io.File; import java.io.IOException; +import java.util.HashMap; +import java.util.Map; import java.util.Properties; import junit.framework.Assert; +import org.apache.flume.Event; import org.apache.flume.Transaction; import org.apache.flume.channel.jdbc.impl.JdbcChannelProviderImpl; import org.junit.After; @@ -80,6 +100,36 @@ public class TestJdbcChannelProvider { provider = null; } + @Test + public void testPeristingEvents() { + provider = new JdbcChannelProviderImpl(); + provider.initialize(derbyProps); + + int nameLimit = ConfigurationConstants.HEADER_NAME_LENGTH_THRESHOLD; + int th = ConfigurationConstants.PAYLOAD_LENGTH_THRESHOLD; + + byte[] s1 = MockEventUtils.generatePayload(th - 1); + Map m1 = new HashMap(); + m1.put(MockEventUtils.generateHeaderString(1), "one"); + m1.put(MockEventUtils.generateHeaderString(2), "twotwo"); + m1.put(MockEventUtils.generateHeaderString(3), "three"); + m1.put(MockEventUtils.generateHeaderString(100), "ahundred"); + m1.put(MockEventUtils.generateHeaderString(nameLimit - 21), "w"); + m1.put(MockEventUtils.generateHeaderString(nameLimit - 2), "x"); + m1.put(MockEventUtils.generateHeaderString(nameLimit - 1), "y"); + m1.put(MockEventUtils.generateHeaderString(nameLimit), "z"); + m1.put(MockEventUtils.generateHeaderString(nameLimit + 1), "a"); + m1.put(MockEventUtils.generateHeaderString(nameLimit + 2), "b"); + m1.put(MockEventUtils.generateHeaderString(nameLimit + 21), "c"); + + Event event = new MockEvent(s1, m1); + + provider.persistEvent("test", event); + + provider.close(); + provider = null; + } + @After public void tearDown() throws IOException { if (provider != null) { Modified: incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/TestPersistentEvent.java URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/TestPersistentEvent.java?rev=1183252&r1=1183251&r2=1183252&view=diff ============================================================================== --- incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/TestPersistentEvent.java (original) +++ incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/TestPersistentEvent.java Fri Oct 14 09:13:39 2011 @@ -1,3 +1,20 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.flume.channel.jdbc; import java.util.HashMap; @@ -15,105 +32,80 @@ public class TestPersistentEvent { private static final Logger LOGGER = LoggerFactory.getLogger(TestPersistentEvent.class); - private static final String[] CHARS = new String[] { - "a","b","c","d","e","f","g","h","i","j","k","l","m","n","o","p","q","r", - "s","t","u","v","w","x","y","z", - "A","B","C","D","E","F","G","H","I","J","K","L","M","N","O","P","Q","R", - "S","T","U","V","W","X","Y","Z", - "0","1","2","3","4","5","6","7","8","9", - "!","@","#","$","%","^","&","*","(",")", - "[","]","{","}",":",";","\"","'",",",".","<",">","?","/","\\","|", - }; + @Test public void testMarshalling() { - Random rnd = new Random(System.currentTimeMillis()); int nameLimit = ConfigurationConstants.HEADER_NAME_LENGTH_THRESHOLD; int valLimit = ConfigurationConstants.HEADER_VALUE_LENGTH_THRESHOLD; - byte[] s1 = new byte[1]; - rnd.nextBytes(s1); + byte[] s1 = MockEventUtils.generatePayload(1); runTest(s1, null); - byte[] s2 = new byte[2]; - rnd.nextBytes(s2); + byte[] s2 = MockEventUtils.generatePayload(2); runTest(s2, new HashMap()); - byte[] s3 = new byte[ConfigurationConstants.PAYLOAD_LENGTH_THRESHOLD - 2]; - rnd.nextBytes(s3); + int th = ConfigurationConstants.PAYLOAD_LENGTH_THRESHOLD; + + byte[] s3 = MockEventUtils.generatePayload(th - 2); Map m3 = new HashMap(); - m3.put(generateString(rnd, 1), generateString(rnd, 1)); + m3.put(MockEventUtils.generateHeaderString(1), + MockEventUtils.generateHeaderString(1)); runTest(s3, m3); - byte[] s4 = new byte[ConfigurationConstants.PAYLOAD_LENGTH_THRESHOLD - 1]; - rnd.nextBytes(s4); + byte[] s4 = MockEventUtils.generatePayload(th - 1); Map m4 = new HashMap(); - m4.put(generateString(rnd, nameLimit - 21), "w"); - m4.put(generateString(rnd, nameLimit - 2), "x"); - m4.put(generateString(rnd, nameLimit - 1), "y"); - m4.put(generateString(rnd, nameLimit), "z"); - m4.put(generateString(rnd, nameLimit + 1), "a"); - m4.put(generateString(rnd, nameLimit + 2), "b"); - m4.put(generateString(rnd, nameLimit + 21), "c"); + m4.put(MockEventUtils.generateHeaderString(nameLimit - 21), "w"); + m4.put(MockEventUtils.generateHeaderString(nameLimit - 2), "x"); + m4.put(MockEventUtils.generateHeaderString(nameLimit - 1), "y"); + m4.put(MockEventUtils.generateHeaderString(nameLimit), "z"); + m4.put(MockEventUtils.generateHeaderString(nameLimit + 1), "a"); + m4.put(MockEventUtils.generateHeaderString(nameLimit + 2), "b"); + m4.put(MockEventUtils.generateHeaderString(nameLimit + 21), "c"); runTest(s4, m4); - byte[] s5 = new byte[ConfigurationConstants.PAYLOAD_LENGTH_THRESHOLD]; - rnd.nextBytes(s5); + byte[] s5 = MockEventUtils.generatePayload(th); Map m5 = new HashMap(); - m5.put("w", generateString(rnd, valLimit - 21)); - m5.put("x", generateString(rnd, valLimit - 2)); - m5.put("y", generateString(rnd, valLimit - 1)); - m5.put("z", generateString(rnd, valLimit)); - m5.put("a", generateString(rnd, valLimit + 1)); - m5.put("b", generateString(rnd, valLimit + 2)); - m5.put("c", generateString(rnd, valLimit + 21)); + m5.put("w", MockEventUtils.generateHeaderString(valLimit - 21)); + m5.put("x", MockEventUtils.generateHeaderString(valLimit - 2)); + m5.put("y", MockEventUtils.generateHeaderString(valLimit - 1)); + m5.put("z", MockEventUtils.generateHeaderString(valLimit)); + m5.put("a", MockEventUtils.generateHeaderString(valLimit + 1)); + m5.put("b", MockEventUtils.generateHeaderString(valLimit + 2)); + m5.put("c", MockEventUtils.generateHeaderString(valLimit + 21)); runTest(s5, m5); - byte[] s6 = new byte[ConfigurationConstants.PAYLOAD_LENGTH_THRESHOLD + 1]; - rnd.nextBytes(s6); + byte[] s6 = MockEventUtils.generatePayload(th + 1); Map m6 = new HashMap(); - m6.put(generateString(rnd, nameLimit - 21), - generateString(rnd, valLimit - 21)); - m6.put(generateString(rnd, nameLimit - 2), - generateString(rnd, valLimit - 2)); - m6.put(generateString(rnd, nameLimit - 1), - generateString(rnd, valLimit - 1)); - m6.put(generateString(rnd, nameLimit), - generateString(rnd, valLimit)); - m6.put(generateString(rnd, nameLimit + 1), - generateString(rnd, valLimit + 1)); - m6.put(generateString(rnd, nameLimit + 2), - generateString(rnd, valLimit + 2)); - m6.put(generateString(rnd, nameLimit + 21), - generateString(rnd, valLimit + 21)); + m6.put(MockEventUtils.generateHeaderString(nameLimit - 21), + MockEventUtils.generateHeaderString(valLimit - 21)); + m6.put(MockEventUtils.generateHeaderString(nameLimit - 2), + MockEventUtils.generateHeaderString(valLimit - 2)); + m6.put(MockEventUtils.generateHeaderString(nameLimit - 1), + MockEventUtils.generateHeaderString(valLimit - 1)); + m6.put(MockEventUtils.generateHeaderString(nameLimit), + MockEventUtils.generateHeaderString(valLimit)); + m6.put(MockEventUtils.generateHeaderString(nameLimit + 1), + MockEventUtils.generateHeaderString(valLimit + 1)); + m6.put(MockEventUtils.generateHeaderString(nameLimit + 2), + MockEventUtils.generateHeaderString(valLimit + 2)); + m6.put(MockEventUtils.generateHeaderString(nameLimit + 21), + MockEventUtils.generateHeaderString(valLimit + 21)); runTest(s6, m6); - byte[] s7 = new byte[ConfigurationConstants.PAYLOAD_LENGTH_THRESHOLD + 2]; - rnd.nextBytes(s7); + byte[] s7 = MockEventUtils.generatePayload(th + 2); runTest(s7, null); - byte[] s8 = new byte[ConfigurationConstants.PAYLOAD_LENGTH_THRESHOLD + 27]; - rnd.nextBytes(s8); + byte[] s8 = MockEventUtils.generatePayload(th + 27); runTest(s8, null); } - private String generateString(Random rnd, int size) { - StringBuilder sb = new StringBuilder(); - for (int i = 0; i < size; i++) { - int x = Math.abs(rnd.nextInt()); - int y = x % CHARS.length; - sb.append(CHARS[y]); - } - System.out.println("String: " + sb); - return sb.toString(); - } - - private void runTest(byte[] payload, Map headers) { - PersistableEvent pe = new PersistableEvent(new MockEvent(payload, headers)); + PersistableEvent pe = new PersistableEvent("test", + new MockEvent(payload, headers)); Assert.assertArrayEquals(payload, pe.getPayload()); Map h = pe.getHeaders(); if (h == null) { @@ -129,38 +121,4 @@ public class TestPersistentEvent { Assert.assertTrue(headers.size() == 0); } } - - - - private static class MockEvent implements Event { - - private final byte[] payload; - private final Map headers; - - private MockEvent(byte[] payload, Map headers) { - this.payload = payload; - this.headers = headers; - } - - @Override - public Map getHeaders() { - return headers; - } - - @Override - public void setHeaders(Map headers) { - - } - - @Override - public byte[] getBody() { - return payload; - } - - @Override - public void setBody(byte[] body) { - - } - - } }