activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r493226 - in /incubator/activemq/trunk: activemq-core/src/main/java/org/apache/activemq/store/quick/ activemq-jpa-store/ activemq-jpa-store/src/main/java/org/apache/activemq/store/jpa/ activemq-jpa-store/src/main/java/org/apache/activemq/st...
Date Fri, 05 Jan 2007 23:05:41 GMT
Author: chirino
Date: Fri Jan  5 15:05:40 2007
New Revision: 493226

URL: http://svn.apache.org/viewvc?view=rev&rev=493226
Log:
Added a JPA based ReferenceStoreAdapter implementation

Added:
    incubator/activemq/trunk/activemq-jpa-store/src/main/java/org/apache/activemq/store/jpa/JPAReferenceStore.java
    incubator/activemq/trunk/activemq-jpa-store/src/main/java/org/apache/activemq/store/jpa/JPAReferenceStoreAdapter.java
    incubator/activemq/trunk/activemq-jpa-store/src/main/java/org/apache/activemq/store/jpa/JPATopicReferenceStore.java
    incubator/activemq/trunk/activemq-jpa-store/src/main/java/org/apache/activemq/store/jpa/model/StoredMessageReference.java
    incubator/activemq/trunk/activemq-jpa-store/src/main/resources/META-INF/persistence.xml
      - copied, changed from r493115, incubator/activemq/trunk/activemq-core/src/main/resources/META-INF/persistence.xml
    incubator/activemq/trunk/activemq-jpa-store/src/test/java/org/apache/activemq/broker/store/JPAStoreLoadTester.java
    incubator/activemq/trunk/activemq-jpa-store/src/test/java/org/apache/activemq/broker/store/QuickJPAStoreLoadTester.java
    incubator/activemq/trunk/activemq-jpa-store/src/test/java/org/apache/activemq/broker/store/QuickJPAStoreRecoveryBrokerTest.java
    incubator/activemq/trunk/activemq-jpa-store/src/test/java/org/apache/activemq/broker/store/QuickJPAStoreXARecoveryBrokerTest.java
    incubator/activemq/trunk/activemq-jpa-store/src/test/resources/org/
    incubator/activemq/trunk/activemq-jpa-store/src/test/resources/org/apache/
    incubator/activemq/trunk/activemq-jpa-store/src/test/resources/org/apache/activemq/
    incubator/activemq/trunk/activemq-jpa-store/src/test/resources/org/apache/activemq/broker/
    incubator/activemq/trunk/activemq-jpa-store/src/test/resources/org/apache/activemq/broker/store/
    incubator/activemq/trunk/activemq-jpa-store/src/test/resources/org/apache/activemq/broker/store/jpabroker.xml
    incubator/activemq/trunk/activemq-jpa-store/src/test/resources/org/apache/activemq/broker/store/quickjpabroker.xml
Modified:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/quick/QuickPersistenceAdapter.java
    incubator/activemq/trunk/activemq-jpa-store/pom.xml
    incubator/activemq/trunk/activemq-jpa-store/src/main/java/org/apache/activemq/store/jpa/JPAMessageStore.java
    incubator/activemq/trunk/activemq-jpa-store/src/main/java/org/apache/activemq/store/jpa/JPAPersistenceAdapter.java
    incubator/activemq/trunk/activemq-jpa-store/src/main/java/org/apache/activemq/store/jpa/JPATopicMessageStore.java

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/quick/QuickPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/quick/QuickPersistenceAdapter.java?view=diff&rev=493226&r1=493225&r2=493226
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/quick/QuickPersistenceAdapter.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/quick/QuickPersistenceAdapter.java Fri Jan  5 15:05:40 2007
@@ -88,7 +88,7 @@
 
     private UsageManager usageManager;
 
-    private long cleanupInterval = 1000 * 1/10;
+    private long cleanupInterval = 1000 * 60;
     private long checkpointInterval = 1000 * 10;
     
     private int maxCheckpointWorkers = 1;

Modified: incubator/activemq/trunk/activemq-jpa-store/pom.xml
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-jpa-store/pom.xml?view=diff&rev=493226&r1=493225&r2=493226
==============================================================================
--- incubator/activemq/trunk/activemq-jpa-store/pom.xml (original)
+++ incubator/activemq/trunk/activemq-jpa-store/pom.xml Fri Jan  5 15:05:40 2007
@@ -73,11 +73,39 @@
       <artifactId>commons-pool</artifactId>
       <optional>true</optional>
     </dependency>
+    <dependency>
+      <groupId>org.apache.xbean</groupId>
+      <artifactId>xbean-spring</artifactId>
+      <optional>true</optional>
+    </dependency>
+    <dependency>
+      <groupId>org.springframework</groupId>
+      <artifactId>spring</artifactId>
+      <optional>true</optional>
+    </dependency>
 
   </dependencies>
   
   <build>
-    <plugins>
+    <plugins>
+
+      <plugin>
+        <groupId>org.apache.xbean</groupId>
+        <artifactId>maven-xbean-plugin</artifactId>
+        <version>${xbean-version}</version>
+        <executions>
+          <execution>
+            <configuration>
+              <namespace>http://activemq.org/activemq-jpa-store/config/1.0</namespace>
+              <schema>target/xbean/activemq-jpa-store.xsd</schema>
+            </configuration>
+            <goals>
+              <goal>mapping</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+    
       <plugin>
         <groupId>org.apache.maven.plugins</groupId>
         <artifactId>maven-antrun-plugin</artifactId>

Modified: incubator/activemq/trunk/activemq-jpa-store/src/main/java/org/apache/activemq/store/jpa/JPAMessageStore.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-jpa-store/src/main/java/org/apache/activemq/store/jpa/JPAMessageStore.java?view=diff&rev=493226&r1=493225&r2=493226
==============================================================================
--- incubator/activemq/trunk/activemq-jpa-store/src/main/java/org/apache/activemq/store/jpa/JPAMessageStore.java (original)
+++ incubator/activemq/trunk/activemq-jpa-store/src/main/java/org/apache/activemq/store/jpa/JPAMessageStore.java Fri Jan  5 15:05:40 2007
@@ -1,3 +1,20 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.activemq.store.jpa;
 
 import java.io.IOException;
@@ -59,12 +76,6 @@
 		adapter.commitEntityManager(context,manager);		
 	}
 
-	public void addMessageReference(ConnectionContext context,
-			MessageId messageId, long expirationTime, String messageRef)
-			throws IOException {
-		throw new IOException("Not implemented.");
-	}
-
 	public ActiveMQDestination getDestination() {
 		return destination;
 	}
@@ -105,10 +116,7 @@
 		return rc.intValue();
 	}
 
-	public String getMessageReference(MessageId identity) throws IOException {
-		throw new IOException("Not implemented.");
-	}
-
+	@SuppressWarnings("unchecked")
 	public void recover(MessageRecoveryListener container) throws Exception {
 		EntityManager manager = adapter.beginEntityManager(null);
 		try {
@@ -125,6 +133,7 @@
 		adapter.commitEntityManager(null,manager);
 	}
 
+	@SuppressWarnings("unchecked")
 	public void recoverNextMessages(int maxReturned, MessageRecoveryListener listener) throws Exception {
 		
 		EntityManager manager = adapter.beginEntityManager(null);

Modified: incubator/activemq/trunk/activemq-jpa-store/src/main/java/org/apache/activemq/store/jpa/JPAPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-jpa-store/src/main/java/org/apache/activemq/store/jpa/JPAPersistenceAdapter.java?view=diff&rev=493226&r1=493225&r2=493226
==============================================================================
--- incubator/activemq/trunk/activemq-jpa-store/src/main/java/org/apache/activemq/store/jpa/JPAPersistenceAdapter.java (original)
+++ incubator/activemq/trunk/activemq-jpa-store/src/main/java/org/apache/activemq/store/jpa/JPAPersistenceAdapter.java Fri Jan  5 15:05:40 2007
@@ -48,7 +48,7 @@
  * An implementation of {@link PersistenceAdapter} that uses JPA to
  * store it's messages.
  * 
- * @org.apache.xbean.XBean
+ * @org.apache.xbean.XBean element="jpaPersistenceAdapter"
  * 
  * @version $Revision: 1.17 $
  */
@@ -149,7 +149,7 @@
 		commitEntityManager(null,manager);		
 	}
 
-	public Set getDestinations() {
+	public Set<ActiveMQDestination> getDestinations() {
 		HashSet<ActiveMQDestination> rc = new HashSet<ActiveMQDestination>();
 		
 		EntityManager manager = beginEntityManager(null);
@@ -188,12 +188,6 @@
 	}
 
 	public void setUsageManager(UsageManager usageManager) {
-	}
-
-	public void setUseExternalMessageReferences(boolean enable) {
-		if( enable ) {
-			throw new IllegalArgumentException("This persistence adapter does not support externa message references");
-		}
 	}
 
 	public void start() throws Exception {

Added: incubator/activemq/trunk/activemq-jpa-store/src/main/java/org/apache/activemq/store/jpa/JPAReferenceStore.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-jpa-store/src/main/java/org/apache/activemq/store/jpa/JPAReferenceStore.java?view=auto&rev=493226
==============================================================================
--- incubator/activemq/trunk/activemq-jpa-store/src/main/java/org/apache/activemq/store/jpa/JPAReferenceStore.java (added)
+++ incubator/activemq/trunk/activemq-jpa-store/src/main/java/org/apache/activemq/store/jpa/JPAReferenceStore.java Fri Jan  5 15:05:40 2007
@@ -0,0 +1,210 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store.jpa;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.persistence.EntityManager;
+import javax.persistence.Query;
+
+import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.MessageId;
+import org.apache.activemq.memory.UsageManager;
+import org.apache.activemq.store.MessageRecoveryListener;
+import org.apache.activemq.store.ReferenceStore;
+import org.apache.activemq.store.jpa.model.StoredMessageReference;
+import org.apache.activemq.util.IOExceptionSupport;
+import org.apache.activemq.wireformat.WireFormat;
+
+public class JPAReferenceStore implements ReferenceStore {
+	
+	protected final JPAPersistenceAdapter adapter;
+	protected final WireFormat wireFormat;
+	protected final ActiveMQDestination destination;
+	protected final String destinationName;
+    protected AtomicLong lastMessageId = new AtomicLong(-1);
+    
+	public JPAReferenceStore(JPAPersistenceAdapter adapter, ActiveMQDestination destination) {
+		this.adapter = adapter;
+		this.destination = destination;
+		this.destinationName = destination.getQualifiedName();
+		this.wireFormat = this.adapter.getWireFormat();
+	}
+	
+	public ActiveMQDestination getDestination() {
+		return destination;
+	}
+
+	public void addMessage(ConnectionContext context, Message message) throws IOException {
+		throw new RuntimeException("Use addMessageReference instead");
+	}
+	
+	public Message getMessage(MessageId identity) throws IOException {
+		throw new RuntimeException("Use addMessageReference instead");
+	}
+	
+	public void addMessageReference(ConnectionContext context, MessageId messageId, ReferenceData data) throws IOException {
+		EntityManager manager = adapter.beginEntityManager(context);
+		try {
+			
+			StoredMessageReference sm = new StoredMessageReference();
+			sm.setDestination(destinationName);
+			sm.setId(messageId.getBrokerSequenceId());
+			sm.setMessageId(messageId.toString());
+			sm.setExiration(data.getExpiration());
+			sm.setFileId(data.getFileId());
+			sm.setOffset(data.getOffset());
+		
+			manager.persist(sm);
+			
+		} catch (Throwable e) {
+			adapter.rollbackEntityManager(context,manager);
+			throw IOExceptionSupport.create(e);
+		}
+		adapter.commitEntityManager(context,manager);		
+	}
+
+	public ReferenceData getMessageReference(MessageId identity) throws IOException {
+		ReferenceData rc=null;
+		EntityManager manager = adapter.beginEntityManager(null);
+		try {
+			StoredMessageReference message=null;
+			if( identity.getBrokerSequenceId()!= 0 ) {
+				message = manager.find(StoredMessageReference.class, identity.getBrokerSequenceId());			
+			} else {
+				Query query = manager.createQuery("select m from StoredMessageReference m where m.messageId=?1");
+				query.setParameter(1, identity.toString());
+				message = (StoredMessageReference) query.getSingleResult();
+			}
+			if( message !=null ) {
+				rc = new ReferenceData();
+				rc.setExpiration(message.getExiration());
+				rc.setFileId(message.getFileId());
+				rc.setOffset(message.getOffset());
+			}
+		} catch (Throwable e) {
+			adapter.rollbackEntityManager(null,manager);
+			throw IOExceptionSupport.create(e);
+		}
+		adapter.commitEntityManager(null,manager);
+		return rc;
+	}
+
+	public int getMessageCount() throws IOException {
+		Long rc;
+		EntityManager manager = adapter.beginEntityManager(null);
+		try {
+			Query query = manager.createQuery("select count(m) from StoredMessageReference m");
+			rc = (Long) query.getSingleResult();
+		} catch (Throwable e) {
+			adapter.rollbackEntityManager(null,manager);
+			throw IOExceptionSupport.create(e);
+		}
+		adapter.commitEntityManager(null,manager);
+		return rc.intValue();
+	}
+
+	public void recover(MessageRecoveryListener container) throws Exception {
+		EntityManager manager = adapter.beginEntityManager(null);
+		try {
+			Query query = manager.createQuery("select m from StoredMessageReference m where m.destination=?1 order by m.id asc");
+			query.setParameter(1, destinationName);
+			for (StoredMessageReference m : (List<StoredMessageReference>)query.getResultList()) {
+				MessageId id = new MessageId(m.getMessageId());
+				id.setBrokerSequenceId(m.getId());
+				container.recoverMessageReference(id);
+	        }
+		} catch (Throwable e) {
+			adapter.rollbackEntityManager(null,manager);
+			throw IOExceptionSupport.create(e);
+		}
+		adapter.commitEntityManager(null,manager);
+	}
+
+	public void recoverNextMessages(int maxReturned, MessageRecoveryListener listener) throws Exception {
+		
+		EntityManager manager = adapter.beginEntityManager(null);
+		try {
+			
+			Query query = manager.createQuery("select m from StoredMessageReference m where m.destination=?1 and m.id>?2 order by m.id asc");
+			query.setParameter(1, destinationName);
+			query.setParameter(2, lastMessageId.get());
+			query.setMaxResults(maxReturned);
+			int count = 0;
+			for (StoredMessageReference m : (List<StoredMessageReference>)query.getResultList()) {
+				MessageId id = new MessageId(m.getMessageId());
+				id.setBrokerSequenceId(m.getId());				
+				listener.recoverMessageReference(id);
+				lastMessageId.set(m.getId());
+				count++;
+				if( count >= maxReturned ) { 
+					return;
+				}
+	        }
+
+		} catch (Throwable e) {
+			adapter.rollbackEntityManager(null,manager);
+			throw IOExceptionSupport.create(e);
+		}
+		adapter.commitEntityManager(null,manager);
+	}
+
+	public void removeAllMessages(ConnectionContext context) throws IOException {
+		EntityManager manager = adapter.beginEntityManager(context);
+		try {
+			Query query = manager.createQuery("delete from StoredMessageReference m where m.destination=?1");
+			query.setParameter(1, destinationName);
+			query.executeUpdate();
+		} catch (Throwable e) {
+			adapter.rollbackEntityManager(context,manager);
+			throw IOExceptionSupport.create(e);
+		}
+		adapter.commitEntityManager(context,manager);
+	}
+
+	public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException {
+		EntityManager manager = adapter.beginEntityManager(context);
+		try {
+			Query query = manager.createQuery("delete from StoredMessageReference m where m.id=?1");
+			query.setParameter(1, ack.getLastMessageId().getBrokerSequenceId());
+			query.executeUpdate();
+		} catch (Throwable e) {
+			adapter.rollbackEntityManager(context,manager);
+			throw IOExceptionSupport.create(e);
+		}
+		adapter.commitEntityManager(context,manager);
+	}
+
+	public void resetBatching() {
+        lastMessageId.set(-1);
+	}
+
+	public void setUsageManager(UsageManager usageManager) {
+	}
+
+	public void start() throws Exception {
+	}
+
+	public void stop() throws Exception {
+	}
+}

Added: incubator/activemq/trunk/activemq-jpa-store/src/main/java/org/apache/activemq/store/jpa/JPAReferenceStoreAdapter.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-jpa-store/src/main/java/org/apache/activemq/store/jpa/JPAReferenceStoreAdapter.java?view=auto&rev=493226
==============================================================================
--- incubator/activemq/trunk/activemq-jpa-store/src/main/java/org/apache/activemq/store/jpa/JPAReferenceStoreAdapter.java (added)
+++ incubator/activemq/trunk/activemq-jpa-store/src/main/java/org/apache/activemq/store/jpa/JPAReferenceStoreAdapter.java Fri Jan  5 15:05:40 2007
@@ -0,0 +1,131 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store.jpa;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import javax.persistence.EntityManager;
+import javax.persistence.Query;
+
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.store.MessageStore;
+import org.apache.activemq.store.ReferenceStore;
+import org.apache.activemq.store.ReferenceStoreAdapter;
+import org.apache.activemq.store.TopicMessageStore;
+import org.apache.activemq.store.TopicReferenceStore;
+import org.apache.activemq.util.IOExceptionSupport;
+
+/**
+ * An implementation of {@link ReferenceStoreAdapter} that uses JPA to
+ * store it's message references.
+ * 
+ * @org.apache.xbean.XBean element="jpaReferenceStoreAdapter"
+ * 
+ * @version $Revision: 1.17 $
+ */
+public class JPAReferenceStoreAdapter extends JPAPersistenceAdapter implements ReferenceStoreAdapter {
+
+	@Override
+	public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException {
+		throw new RuntimeException("Use createQueueReferenceStore instead.");
+	}
+	
+	@Override
+	public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException {
+		throw new RuntimeException("Use createTopicReferenceStore instead.");
+	}
+	
+	public ReferenceStore createQueueReferenceStore(ActiveMQQueue destination) throws IOException {
+		JPAReferenceStore rc =  new JPAReferenceStore(this, destination);
+        return rc;
+	}
+
+	public TopicReferenceStore createTopicReferenceStore(ActiveMQTopic destination) throws IOException {
+		JPATopicReferenceStore rc =  new JPATopicReferenceStore(this, destination);
+        return rc;
+	}
+	
+
+	public void deleteAllMessages() throws IOException {
+		EntityManager manager = beginEntityManager(null);
+		try {
+			Query query = manager.createQuery("delete from StoredMessageReference m");
+			query.executeUpdate();
+			query = manager.createQuery("delete from StoredSubscription ss");
+			query.executeUpdate();
+		} catch (Throwable e) {
+			rollbackEntityManager(null,manager);
+			throw IOExceptionSupport.create(e);
+		}
+		commitEntityManager(null,manager);		
+	}
+
+	public Set<ActiveMQDestination> getDestinations() {
+		HashSet<ActiveMQDestination> rc = new HashSet<ActiveMQDestination>();
+		
+		EntityManager manager = beginEntityManager(null);
+		try {
+			Query query = manager.createQuery("select distinct m.destination from StoredMessageReference m");
+			for (String dest : (List<String>)query.getResultList()) {
+				rc.add(ActiveMQDestination.createDestination(dest,ActiveMQDestination.QUEUE_TYPE));
+	        }
+		} catch (RuntimeException e) {
+			rollbackEntityManager(null,manager);
+			throw e;
+		}
+		commitEntityManager(null,manager);		
+		return rc;
+	}
+
+	public long getLastMessageBrokerSequenceId() throws IOException {
+		long rc=0;
+		EntityManager manager = beginEntityManager(null);
+		try {
+			Query query = manager.createQuery("select max(m.id) from StoredMessageReference m");
+			Long t = (Long) query.getSingleResult();
+			if( t != null ) {
+				rc = t;
+			}
+		} catch (Throwable e) {
+			rollbackEntityManager(null,manager);
+			throw IOExceptionSupport.create(e);
+		}
+		commitEntityManager(null,manager);		
+		return rc;
+	}	
+
+	public Set<Integer> getReferenceFileIdsInUse() throws IOException {
+		HashSet<Integer> rc=null;
+		EntityManager manager = beginEntityManager(null);
+		try {
+			Query query = manager.createQuery("select distinct m.fileId from StoredMessageReference m");
+			rc=new HashSet<Integer>((List<Integer>)query.getResultList());
+		} catch (Throwable e) {
+			rollbackEntityManager(null,manager);
+			throw IOExceptionSupport.create(e);
+		}
+		commitEntityManager(null,manager);
+		return rc;
+	}
+
+}

Modified: incubator/activemq/trunk/activemq-jpa-store/src/main/java/org/apache/activemq/store/jpa/JPATopicMessageStore.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-jpa-store/src/main/java/org/apache/activemq/store/jpa/JPATopicMessageStore.java?view=diff&rev=493226&r1=493225&r2=493226
==============================================================================
--- incubator/activemq/trunk/activemq-jpa-store/src/main/java/org/apache/activemq/store/jpa/JPATopicMessageStore.java (original)
+++ incubator/activemq/trunk/activemq-jpa-store/src/main/java/org/apache/activemq/store/jpa/JPATopicMessageStore.java Fri Jan  5 15:05:40 2007
@@ -1,3 +1,20 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.activemq.store.jpa;
 
 import java.io.IOException;

Added: incubator/activemq/trunk/activemq-jpa-store/src/main/java/org/apache/activemq/store/jpa/JPATopicReferenceStore.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-jpa-store/src/main/java/org/apache/activemq/store/jpa/JPATopicReferenceStore.java?view=auto&rev=493226
==============================================================================
--- incubator/activemq/trunk/activemq-jpa-store/src/main/java/org/apache/activemq/store/jpa/JPATopicReferenceStore.java (added)
+++ incubator/activemq/trunk/activemq-jpa-store/src/main/java/org/apache/activemq/store/jpa/JPATopicReferenceStore.java Fri Jan  5 15:05:40 2007
@@ -0,0 +1,250 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store.jpa;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.persistence.EntityManager;
+import javax.persistence.Query;
+
+import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.MessageId;
+import org.apache.activemq.command.SubscriptionInfo;
+import org.apache.activemq.store.MessageRecoveryListener;
+import org.apache.activemq.store.TopicReferenceStore;
+import org.apache.activemq.store.jpa.model.StoredMessageReference;
+import org.apache.activemq.store.jpa.model.StoredSubscription;
+import org.apache.activemq.store.jpa.model.StoredSubscription.SubscriptionId;
+import org.apache.activemq.util.IOExceptionSupport;
+
+public class JPATopicReferenceStore extends JPAReferenceStore implements TopicReferenceStore {
+    private Map<SubscriptionId,AtomicLong> subscriberLastMessageMap=new ConcurrentHashMap<SubscriptionId,AtomicLong>();
+
+	public JPATopicReferenceStore(JPAPersistenceAdapter adapter, ActiveMQDestination destination) {
+		super(adapter, destination);
+	}
+
+	public void acknowledge(ConnectionContext context, String clientId, String subscriptionName, MessageId messageId) throws IOException {
+		EntityManager manager = adapter.beginEntityManager(context);
+		try {
+			StoredSubscription ss = findStoredSubscription(manager, clientId, subscriptionName);
+			ss.setLastAckedId(messageId.getBrokerSequenceId());
+		} catch (Throwable e) {
+			adapter.rollbackEntityManager(context,manager);
+			throw IOExceptionSupport.create(e);
+		}
+		adapter.commitEntityManager(context,manager);
+	}
+
+	public void addSubsciption(String clientId, String subscriptionName, String selector, boolean retroactive) throws IOException {
+		EntityManager manager = adapter.beginEntityManager(null);
+		try {			
+			StoredSubscription ss = new StoredSubscription();
+			ss.setClientId(clientId);
+			ss.setSubscriptionName(subscriptionName);
+			ss.setDestination(destinationName);
+			ss.setSelector(selector);
+			ss.setLastAckedId(-1);
+			
+			if( !retroactive ) {
+				Query query = manager.createQuery("select max(m.id) from StoredMessageReference m");
+				Long rc = (Long) query.getSingleResult();
+				if( rc != null ) {
+					ss.setLastAckedId(rc);
+				}
+			}
+			
+			manager.persist(ss);
+		} catch (Throwable e) {
+			adapter.rollbackEntityManager(null,manager);
+			throw IOExceptionSupport.create(e);
+		}
+		adapter.commitEntityManager(null,manager);
+	}
+
+	public void deleteSubscription(String clientId, String subscriptionName) throws IOException {
+		EntityManager manager = adapter.beginEntityManager(null);
+		try {			
+			StoredSubscription ss = findStoredSubscription(manager, clientId, subscriptionName);
+			manager.remove(ss);
+		} catch (Throwable e) {
+			adapter.rollbackEntityManager(null,manager);
+			throw IOExceptionSupport.create(e);
+		}
+		adapter.commitEntityManager(null,manager);
+	}
+
+	private StoredSubscription findStoredSubscription(EntityManager manager, String clientId, String subscriptionName) {
+		Query query = manager.createQuery(
+				"select ss from StoredSubscription ss " +
+				"where ss.clientId=?1 " +
+				"and ss.subscriptionName=?2 " +
+				"and ss.destination=?3");
+		query.setParameter(1, clientId);
+		query.setParameter(2, subscriptionName);
+		query.setParameter(3, destinationName);
+		List<StoredSubscription> resultList = query.getResultList();
+		if( resultList.isEmpty() )
+			return null;
+		return resultList.get(0); 
+	}
+
+	public SubscriptionInfo[] getAllSubscriptions() throws IOException {
+		SubscriptionInfo rc[];
+		EntityManager manager = adapter.beginEntityManager(null);
+		try {
+			ArrayList<SubscriptionInfo> l = new ArrayList<SubscriptionInfo>();
+			
+			Query query = manager.createQuery("select ss from StoredSubscription ss where ss.destination=?1");
+			query.setParameter(1, destinationName);
+			for (StoredSubscription ss : (List<StoredSubscription>)query.getResultList()) {
+				SubscriptionInfo info = new SubscriptionInfo();
+				info.setClientId(ss.getClientId());
+				info.setDestination(destination);
+				info.setSelector(ss.getSelector());
+				info.setSubcriptionName(ss.getSubscriptionName());
+				l.add(info);
+	        }
+			
+			rc = new SubscriptionInfo[l.size()];
+			l.toArray(rc);
+		} catch (Throwable e) {
+			adapter.rollbackEntityManager(null,manager);
+			throw IOExceptionSupport.create(e);
+		}
+		adapter.commitEntityManager(null,manager);		
+		return rc;
+	}
+
+	public int getMessageCount(String clientId, String subscriptionName) throws IOException {
+		Long rc;
+		EntityManager manager = adapter.beginEntityManager(null);
+		try {	
+			Query query = manager.createQuery(
+					"select count(m) FROM StoredMessageReference m, StoredSubscription ss " +
+					"where ss.clientId=?1 " +
+					"and   ss.subscriptionName=?2 " +
+					"and   ss.destination=?3 " +
+					"and   m.destination=ss.destination and m.id > ss.lastAckedId");
+			query.setParameter(1, clientId);
+			query.setParameter(2, subscriptionName);
+			query.setParameter(3, destinationName);
+	        rc = (Long) query.getSingleResult();	        
+		} catch (Throwable e) {
+			adapter.rollbackEntityManager(null,manager);
+			throw IOExceptionSupport.create(e);
+		}
+		adapter.commitEntityManager(null,manager);
+		return rc.intValue();
+	}
+
+	public SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException {
+		SubscriptionInfo rc=null;
+		EntityManager manager = adapter.beginEntityManager(null);
+		try {			
+			StoredSubscription ss = findStoredSubscription(manager, clientId, subscriptionName);
+			if( ss != null ) {
+				rc = new SubscriptionInfo();
+				rc.setClientId(ss.getClientId());
+				rc.setDestination(destination);
+				rc.setSelector(ss.getSelector());
+				rc.setSubcriptionName(ss.getSubscriptionName());
+			}
+		} catch (Throwable e) {
+			adapter.rollbackEntityManager(null,manager);
+			throw IOExceptionSupport.create(e);
+		}
+		adapter.commitEntityManager(null,manager);
+		return rc;
+	}
+
+	public void recoverNextMessages(String clientId, String subscriptionName, int maxReturned, MessageRecoveryListener listener) throws Exception {
+		EntityManager manager = adapter.beginEntityManager(null);
+		try {
+			SubscriptionId id = new SubscriptionId();
+			id.setClientId(clientId);
+			id.setSubscriptionName(subscriptionName);
+			id.setDestination(destinationName);
+	
+			AtomicLong last=subscriberLastMessageMap.get(id);
+	        if(last==null){
+	    		StoredSubscription ss = findStoredSubscription(manager, clientId, subscriptionName);
+	            last=new AtomicLong(ss.getLastAckedId());
+	            subscriberLastMessageMap.put(id,last);
+	        }
+	        final AtomicLong lastMessageId=last;
+			
+			Query query = manager.createQuery("select m from StoredMessageReference m where m.destination=?1 and m.id>?2 order by m.id asc");
+			query.setParameter(1, destinationName);
+			query.setParameter(2, lastMessageId.get());
+			query.setMaxResults(maxReturned);
+			int count = 0;
+			for (StoredMessageReference m : (List<StoredMessageReference>)query.getResultList()) {
+				MessageId mid = new MessageId(m.getMessageId());
+				mid.setBrokerSequenceId(m.getId());
+				listener.recoverMessageReference(mid);
+
+				lastMessageId.set(m.getId());
+				count++;
+				if( count >= maxReturned ) { 
+					return;
+				}
+	        }        
+		} catch (Throwable e) {
+			adapter.rollbackEntityManager(null,manager);
+			throw IOExceptionSupport.create(e);
+		}
+		adapter.commitEntityManager(null,manager);
+	}
+
+	public void recoverSubscription(String clientId, String subscriptionName, MessageRecoveryListener listener) throws Exception {
+		EntityManager manager = adapter.beginEntityManager(null);
+		try {
+	
+			StoredSubscription ss = findStoredSubscription(manager, clientId, subscriptionName);
+			
+			Query query = manager.createQuery("select m from StoredMessageReference m where m.destination=?1 and m.id>?2 order by m.id asc");
+			query.setParameter(1, destinationName);
+			query.setParameter(2, ss.getLastAckedId());
+			for (StoredMessageReference m : (List<StoredMessageReference>)query.getResultList()) {
+				MessageId mid = new MessageId(m.getMessageId());
+				mid.setBrokerSequenceId(m.getId());
+				listener.recoverMessageReference(mid);
+	        }
+		} catch (Throwable e) {
+			adapter.rollbackEntityManager(null,manager);
+			throw IOExceptionSupport.create(e);
+		}
+		adapter.commitEntityManager(null,manager);
+	}
+
+	public void resetBatching(String clientId, String subscriptionName) {
+		SubscriptionId id = new SubscriptionId();
+		id.setClientId(clientId);
+		id.setSubscriptionName(subscriptionName);
+		id.setDestination(destinationName);
+        subscriberLastMessageMap.remove(id);
+	}
+
+}

Added: incubator/activemq/trunk/activemq-jpa-store/src/main/java/org/apache/activemq/store/jpa/model/StoredMessageReference.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-jpa-store/src/main/java/org/apache/activemq/store/jpa/model/StoredMessageReference.java?view=auto&rev=493226
==============================================================================
--- incubator/activemq/trunk/activemq-jpa-store/src/main/java/org/apache/activemq/store/jpa/model/StoredMessageReference.java (added)
+++ incubator/activemq/trunk/activemq-jpa-store/src/main/java/org/apache/activemq/store/jpa/model/StoredMessageReference.java Fri Jan  5 15:05:40 2007
@@ -0,0 +1,97 @@
+/*
+ * Copyright 2006 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store.jpa.model;
+
+import javax.persistence.Basic;
+import javax.persistence.Entity;
+import javax.persistence.Id;
+
+import org.apache.openjpa.persistence.jdbc.Index;
+
+/** 
+ */
+@Entity()
+public class StoredMessageReference {
+	
+    @Id
+    private long id;
+	
+    @Basic(optional=false)
+    @Index(enabled=true, unique=false)
+    private String messageId;
+
+    @Basic(optional=false)
+    @Index(enabled=true, unique=false)
+    private String destination;
+
+    @Basic
+    @Index(enabled=false, unique=false)
+    private long exiration;
+
+    @Basic(optional=false)
+    @Index(enabled=false, unique=false)
+	private int fileId;
+
+    @Basic(optional=false)
+    @Index(enabled=false, unique=false)
+	private int offset;
+
+    public StoredMessageReference() {
+    }
+
+	public String getDestination() {
+		return destination;
+	}
+	public void setDestination(String destination) {
+		this.destination = destination;
+	}
+
+	public long getExiration() {
+		return exiration;
+	}
+	public void setExiration(long exiration) {
+		this.exiration = exiration;
+	}
+
+	public String getMessageId() {
+		return messageId;
+	}
+	public void setMessageId(String messageId) {
+		this.messageId = messageId;
+	}
+
+	public long getId() {
+		return id;
+	}
+	public void setId(long sequenceId) {
+		this.id = sequenceId;
+	}
+
+	public int getFileId() {
+		return fileId;
+	}
+	public void setFileId(int fileId) {
+		this.fileId = fileId;		
+	}
+
+	public int getOffset() {
+		return offset;
+	}
+	public void setOffset(int offset) {
+		this.offset = offset;
+	}
+
+}

Copied: incubator/activemq/trunk/activemq-jpa-store/src/main/resources/META-INF/persistence.xml (from r493115, incubator/activemq/trunk/activemq-core/src/main/resources/META-INF/persistence.xml)
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-jpa-store/src/main/resources/META-INF/persistence.xml?view=diff&rev=493226&p1=incubator/activemq/trunk/activemq-core/src/main/resources/META-INF/persistence.xml&r1=493115&p2=incubator/activemq/trunk/activemq-jpa-store/src/main/resources/META-INF/persistence.xml&r2=493226
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/resources/META-INF/persistence.xml (original)
+++ incubator/activemq/trunk/activemq-jpa-store/src/main/resources/META-INF/persistence.xml Fri Jan  5 15:05:40 2007
@@ -21,8 +21,6 @@
         <provider>org.apache.openjpa.persistence.PersistenceProviderImpl</provider>
         <class>org.apache.activemq.store.jpa.model.StoredMessage</class>
         <class>org.apache.activemq.store.jpa.model.StoredSubscription</class>
-        <!-- 
-        <class>org.apache.activemq.store.jpa.model.StoredSubscription$SubscriptionId</class>
-         -->
+        <class>org.apache.activemq.store.jpa.model.StoredMessageReference</class>
     </persistence-unit>
 </persistence>

Added: incubator/activemq/trunk/activemq-jpa-store/src/test/java/org/apache/activemq/broker/store/JPAStoreLoadTester.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-jpa-store/src/test/java/org/apache/activemq/broker/store/JPAStoreLoadTester.java?view=auto&rev=493226
==============================================================================
--- incubator/activemq/trunk/activemq-jpa-store/src/test/java/org/apache/activemq/broker/store/JPAStoreLoadTester.java (added)
+++ incubator/activemq/trunk/activemq-jpa-store/src/test/java/org/apache/activemq/broker/store/JPAStoreLoadTester.java Fri Jan  5 15:05:40 2007
@@ -0,0 +1,48 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.store;
+
+import junit.framework.Test;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.xbean.BrokerFactoryBean;
+import org.springframework.core.io.ClassPathResource;
+
+/**
+ * 
+ * @version $Revision$
+ */
+public class JPAStoreLoadTester extends LoadTester  {
+
+    protected BrokerService createBroker() throws Exception {
+        BrokerFactoryBean brokerFactory=new BrokerFactoryBean(new ClassPathResource("org/apache/activemq/broker/store/jpabroker.xml"));
+        brokerFactory.afterPropertiesSet();
+        BrokerService broker =  brokerFactory.getBroker();
+        broker.setDeleteAllMessagesOnStartup(true);
+        return broker;
+    }
+    
+    public static Test suite() {
+        return suite(JPAStoreLoadTester.class);
+    }
+    
+    public static void main(String[] args) {
+        junit.textui.TestRunner.run(suite());
+    }
+    
+}

Added: incubator/activemq/trunk/activemq-jpa-store/src/test/java/org/apache/activemq/broker/store/QuickJPAStoreLoadTester.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-jpa-store/src/test/java/org/apache/activemq/broker/store/QuickJPAStoreLoadTester.java?view=auto&rev=493226
==============================================================================
--- incubator/activemq/trunk/activemq-jpa-store/src/test/java/org/apache/activemq/broker/store/QuickJPAStoreLoadTester.java (added)
+++ incubator/activemq/trunk/activemq-jpa-store/src/test/java/org/apache/activemq/broker/store/QuickJPAStoreLoadTester.java Fri Jan  5 15:05:40 2007
@@ -0,0 +1,48 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.store;
+
+import junit.framework.Test;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.xbean.BrokerFactoryBean;
+import org.springframework.core.io.ClassPathResource;
+
+/**
+ * 
+ * @version $Revision$
+ */
+public class QuickJPAStoreLoadTester extends LoadTester  {
+
+    protected BrokerService createBroker() throws Exception {
+        BrokerFactoryBean brokerFactory=new BrokerFactoryBean(new ClassPathResource("org/apache/activemq/broker/store/quickjpabroker.xml"));
+        brokerFactory.afterPropertiesSet();
+        BrokerService broker =  brokerFactory.getBroker();
+        broker.setDeleteAllMessagesOnStartup(true);
+        return broker;
+    }
+    
+    public static Test suite() {
+        return suite(QuickJPAStoreLoadTester.class);
+    }
+    
+    public static void main(String[] args) {
+        junit.textui.TestRunner.run(suite());
+    }
+    
+}

Added: incubator/activemq/trunk/activemq-jpa-store/src/test/java/org/apache/activemq/broker/store/QuickJPAStoreRecoveryBrokerTest.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-jpa-store/src/test/java/org/apache/activemq/broker/store/QuickJPAStoreRecoveryBrokerTest.java?view=auto&rev=493226
==============================================================================
--- incubator/activemq/trunk/activemq-jpa-store/src/test/java/org/apache/activemq/broker/store/QuickJPAStoreRecoveryBrokerTest.java (added)
+++ incubator/activemq/trunk/activemq-jpa-store/src/test/java/org/apache/activemq/broker/store/QuickJPAStoreRecoveryBrokerTest.java Fri Jan  5 15:05:40 2007
@@ -0,0 +1,79 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.store;
+
+import java.util.Properties;
+
+import junit.framework.Test;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.RecoveryBrokerTest;
+import org.apache.activemq.store.jpa.JPAReferenceStoreAdapter;
+import org.apache.activemq.store.quick.QuickPersistenceAdapter;
+
+/**
+ * Used to verify that recovery works correctly against 
+ * 
+ * @version $Revision$
+ */
+public class QuickJPAStoreRecoveryBrokerTest extends RecoveryBrokerTest {
+
+    protected BrokerService createBroker() throws Exception {
+        BrokerService service = new BrokerService();
+        service.setDeleteAllMessagesOnStartup(true);
+        QuickPersistenceAdapter pa = new QuickPersistenceAdapter();
+        
+        JPAReferenceStoreAdapter rfa = new JPAReferenceStoreAdapter();
+        Properties props = new Properties();
+        props.setProperty("openjpa.ConnectionDriverName", "org.apache.derby.jdbc.EmbeddedDriver");
+        props.setProperty("openjpa.ConnectionURL", "jdbc:derby:activemq-data/derby;create=true");
+        props.setProperty("openjpa.jdbc.SynchronizeMappings", "buildSchema");
+        props.setProperty("openjpa.Log", "DefaultLevel=WARN,SQL=TRACE");
+        rfa.setEntityManagerProperties(props);
+        pa.setReferenceStoreAdapter(rfa);        
+        
+        service.setPersistenceAdapter(pa);
+        return service;
+    }
+    
+    protected BrokerService createRestartedBroker() throws Exception {
+        BrokerService service = new BrokerService();
+        QuickPersistenceAdapter pa = new QuickPersistenceAdapter();
+
+        JPAReferenceStoreAdapter rfa = new JPAReferenceStoreAdapter();
+        Properties props = new Properties();
+        props.setProperty("openjpa.ConnectionDriverName", "org.apache.derby.jdbc.EmbeddedDriver");
+        props.setProperty("openjpa.ConnectionURL", "jdbc:derby:activemq-data/derby;create=true");
+        props.setProperty("openjpa.jdbc.SynchronizeMappings", "buildSchema");
+        props.setProperty("openjpa.Log", "DefaultLevel=WARN,SQL=TRACE");
+        rfa.setEntityManagerProperties(props);
+        pa.setReferenceStoreAdapter(rfa);        
+
+        service.setPersistenceAdapter(pa);
+        return service;
+    }
+    
+    public static Test suite() {
+        return suite(QuickJPAStoreRecoveryBrokerTest.class);
+    }
+    
+    public static void main(String[] args) {
+        junit.textui.TestRunner.run(suite());
+    }
+
+}

Added: incubator/activemq/trunk/activemq-jpa-store/src/test/java/org/apache/activemq/broker/store/QuickJPAStoreXARecoveryBrokerTest.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-jpa-store/src/test/java/org/apache/activemq/broker/store/QuickJPAStoreXARecoveryBrokerTest.java?view=auto&rev=493226
==============================================================================
--- incubator/activemq/trunk/activemq-jpa-store/src/test/java/org/apache/activemq/broker/store/QuickJPAStoreXARecoveryBrokerTest.java (added)
+++ incubator/activemq/trunk/activemq-jpa-store/src/test/java/org/apache/activemq/broker/store/QuickJPAStoreXARecoveryBrokerTest.java Fri Jan  5 15:05:40 2007
@@ -0,0 +1,79 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.store;
+
+import java.util.Properties;
+
+import junit.framework.Test;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.XARecoveryBrokerTest;
+import org.apache.activemq.store.jpa.JPAReferenceStoreAdapter;
+import org.apache.activemq.store.quick.QuickPersistenceAdapter;
+
+/**
+ * Used to verify that recovery works correctly against 
+ * 
+ * @version $Revision$
+ */
+public class QuickJPAStoreXARecoveryBrokerTest extends XARecoveryBrokerTest {
+
+    public static Test suite() {
+        return suite(QuickJPAStoreXARecoveryBrokerTest.class);
+    }
+    
+    public static void main(String[] args) {
+        junit.textui.TestRunner.run(suite());
+    }
+
+    protected BrokerService createBroker() throws Exception {
+        BrokerService service = new BrokerService();
+        service.setDeleteAllMessagesOnStartup(true);
+        QuickPersistenceAdapter pa = new QuickPersistenceAdapter();
+        
+        JPAReferenceStoreAdapter rfa = new JPAReferenceStoreAdapter();
+        Properties props = new Properties();
+        props.setProperty("openjpa.ConnectionDriverName", "org.apache.derby.jdbc.EmbeddedDriver");
+        props.setProperty("openjpa.ConnectionURL", "jdbc:derby:activemq-data/derby;create=true");
+        props.setProperty("openjpa.jdbc.SynchronizeMappings", "buildSchema");
+        props.setProperty("openjpa.Log", "DefaultLevel=WARN,SQL=TRACE");
+        rfa.setEntityManagerProperties(props);
+        pa.setReferenceStoreAdapter(rfa);        
+        
+        service.setPersistenceAdapter(pa);
+        return service;
+    }
+    
+    protected BrokerService createRestartedBroker() throws Exception {
+        BrokerService service = new BrokerService();
+        QuickPersistenceAdapter pa = new QuickPersistenceAdapter();
+        
+        JPAReferenceStoreAdapter rfa = new JPAReferenceStoreAdapter();
+        Properties props = new Properties();
+        props.setProperty("openjpa.ConnectionDriverName", "org.apache.derby.jdbc.EmbeddedDriver");
+        props.setProperty("openjpa.ConnectionURL", "jdbc:derby:activemq-data/derby;create=true");
+        props.setProperty("openjpa.jdbc.SynchronizeMappings", "buildSchema");
+        props.setProperty("openjpa.Log", "DefaultLevel=WARN,SQL=TRACE");
+        rfa.setEntityManagerProperties(props);
+        pa.setReferenceStoreAdapter(rfa);        
+        
+        service.setPersistenceAdapter(pa);
+        return service;
+    }
+    
+}

Added: incubator/activemq/trunk/activemq-jpa-store/src/test/resources/org/apache/activemq/broker/store/jpabroker.xml
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-jpa-store/src/test/resources/org/apache/activemq/broker/store/jpabroker.xml?view=auto&rev=493226
==============================================================================
--- incubator/activemq/trunk/activemq-jpa-store/src/test/resources/org/apache/activemq/broker/store/jpabroker.xml (added)
+++ incubator/activemq/trunk/activemq-jpa-store/src/test/resources/org/apache/activemq/broker/store/jpabroker.xml Fri Jan  5 15:05:40 2007
@@ -0,0 +1,44 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+   
+    http://www.apache.org/licenses/LICENSE-2.0
+   
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<beans>
+  <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"/>
+
+  <broker 
+  		brokerName="broker" 
+  		persistent="false" useJmx="false" 
+  		deleteAllMessagesOnStartup="true" 
+        xmlns="http://activemq.org/config/1.0" persistenceAdapter="#jpa">
+        
+     <transportConnectors>
+      <transportConnector uri="tcp://localhost:0"/>
+    </transportConnectors>
+
+  </broker>
+  
+  <bean class="org.apache.activemq.store.jpa.JPAPersistenceAdapter" id="jpa">
+  	<property name="entityManagerProperties">
+     	<props>
+			<prop key="openjpa.ConnectionDriverName">org.apache.derby.jdbc.EmbeddedDriver</prop>
+	        <prop key="openjpa.ConnectionURL">jdbc:derby:activemq-data/derby;create=true</prop>
+	        <prop key="openjpa.jdbc.SynchronizeMappings">buildSchema</prop>
+		 	<prop key="openjpa.Log=DefaultLevel">WARN,SQL=TRACE</prop>
+     	</props>
+	</property>  	
+  </bean>
+
+</beans>

Added: incubator/activemq/trunk/activemq-jpa-store/src/test/resources/org/apache/activemq/broker/store/quickjpabroker.xml
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-jpa-store/src/test/resources/org/apache/activemq/broker/store/quickjpabroker.xml?view=auto&rev=493226
==============================================================================
--- incubator/activemq/trunk/activemq-jpa-store/src/test/resources/org/apache/activemq/broker/store/quickjpabroker.xml (added)
+++ incubator/activemq/trunk/activemq-jpa-store/src/test/resources/org/apache/activemq/broker/store/quickjpabroker.xml Fri Jan  5 15:05:40 2007
@@ -0,0 +1,48 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+   
+    http://www.apache.org/licenses/LICENSE-2.0
+   
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<beans>
+  <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"/>
+
+  <broker 
+  		brokerName="broker" 
+  		persistent="false" useJmx="false" 
+  		deleteAllMessagesOnStartup="true" 
+        xmlns="http://activemq.org/config/1.0">
+        
+    <transportConnectors>
+      <transportConnector uri="tcp://localhost:0"/>
+    </transportConnectors>
+
+    <persistenceAdapter>
+      <quickPersistenceAdapter directory="${basedir}/target/activemq-data/quick-broker.db" referenceStoreAdapter="#jpa"/> 
+    </persistenceAdapter>
+  </broker>
+  
+  <bean class="org.apache.activemq.store.jpa.JPAReferenceStoreAdapter" id="jpa">
+  	<property name="entityManagerProperties">
+     	<props>
+			<prop key="openjpa.ConnectionDriverName">org.apache.derby.jdbc.EmbeddedDriver</prop>
+	        <prop key="openjpa.ConnectionURL">jdbc:derby:activemq-data/derby;create=true</prop>
+	        <prop key="openjpa.jdbc.SynchronizeMappings">buildSchema</prop>
+		 	<prop key="openjpa.Log=DefaultLevel">WARN,SQL=TRACE</prop>
+     	</props>
+	</property>  	
+  </bean>
+  
+
+</beans>



Mime
View raw message