Added: activemq/sandbox/activemq-flow/hawtdb/src/test/java/org/apache/hawtdb/internal/ActionActor.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/hawtdb/src/test/java/org/apache/hawtdb/internal/ActionActor.java?rev=825564&view=auto ============================================================================== --- activemq/sandbox/activemq-flow/hawtdb/src/test/java/org/apache/hawtdb/internal/ActionActor.java (added) +++ activemq/sandbox/activemq-flow/hawtdb/src/test/java/org/apache/hawtdb/internal/ActionActor.java Thu Oct 15 17:04:11 2009 @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hawtdb.internal; + +/** + * + * @author Hiram Chirino + */ +public class ActionActor extends Actor { + + private Action action; + + public ActionActor() { + } + + public ActionActor(String name, Action action) { + super(name); + this.action = action; + this.action.init(cast()); + } + + @SuppressWarnings("unchecked") + private A cast() { + return (A) this; + } + + public void run() throws Exception { + action.run(cast()); + } + + public Action getAction() { + return action; + } + + public void setAction(Action action) { + this.action = action; + this.action.init(cast()); + } + + +} \ No newline at end of file Added: activemq/sandbox/activemq-flow/hawtdb/src/test/java/org/apache/hawtdb/internal/Actor.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/hawtdb/src/test/java/org/apache/hawtdb/internal/Actor.java?rev=825564&view=auto ============================================================================== --- activemq/sandbox/activemq-flow/hawtdb/src/test/java/org/apache/hawtdb/internal/Actor.java (added) +++ activemq/sandbox/activemq-flow/hawtdb/src/test/java/org/apache/hawtdb/internal/Actor.java Thu Oct 15 17:04:11 2009 @@ -0,0 +1,88 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hawtdb.internal; + +import java.util.concurrent.atomic.AtomicReference; + +/** + * + * @author Hiram Chirino + */ +public abstract class Actor { + + public static enum ActorState { + STOPPED, + STARTING, + RUNNING, + STOPPING, + } + + protected String name; + protected Thread thread; + protected AtomicReference state = new AtomicReference(ActorState.STOPPED); + + public Actor() { + } + + public Actor(String name) { + this.name = name; + } + + public void start() { + if( state.compareAndSet(ActorState.STOPPED, ActorState.STARTING) ) { + thread = new Thread(new Runnable() { + public void run() { + if( state.compareAndSet(ActorState.STARTING, ActorState.RUNNING) ) { + try { + while( state.get()==ActorState.RUNNING ) { + Actor.this.run(); + } + } catch (Exception e) { + e.printStackTrace(); + } finally { + state.set(ActorState.STOPPED); + } + } + } + }, name); + thread.start(); + } + } + + public void stop() { + state.compareAndSet(ActorState.RUNNING, ActorState.STOPPING); + } + + public void waitForStop() throws InterruptedException { + stop(); + thread.join(); + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public AtomicReference getState() { + return state; + } + + abstract protected void run() throws Exception; +} \ No newline at end of file Added: activemq/sandbox/activemq-flow/hawtdb/src/test/java/org/apache/hawtdb/internal/Benchmarker.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/hawtdb/src/test/java/org/apache/hawtdb/internal/Benchmarker.java?rev=825564&view=auto ============================================================================== --- activemq/sandbox/activemq-flow/hawtdb/src/test/java/org/apache/hawtdb/internal/Benchmarker.java (added) +++ activemq/sandbox/activemq-flow/hawtdb/src/test/java/org/apache/hawtdb/internal/Benchmarker.java Thu Oct 15 17:04:11 2009 @@ -0,0 +1,118 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hawtdb.internal; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.activemq.metric.MetricCounter; +import org.apache.activemq.metric.Period; + +/** + * + * @author Hiram Chirino + */ +public class Benchmarker { + + public static abstract class BenchmarkAction implements Action { + public final MetricCounter success = new MetricCounter(); + public final MetricCounter failed = new MetricCounter(); + protected final String name; + + public BenchmarkAction(String name) { + this.name = name; + success.setName(name+" success"); + failed.setName(name+" failed"); + } + + public void init(A actor) { + } + + final public void run(final A actor) throws Exception { + try { + execute(actor); + success.increment(); + } catch (Throwable e) { + failed.increment(); + } + } + + abstract protected void execute(A actor) throws Exception; + + public String getName() { + return name; + } + + } + + int samples = 3; + int period = 1000 * 5; + String name; + + public void benchmark(ArrayList actors, ArrayList metrics) throws Exception { + for (Actor actor : actors) { + actor.start(); + } + try { + displayRates(metrics); + } finally { + for (Actor actor : actors) { + actor.stop(); + } + for (Actor actor : actors) { + actor.waitForStop(); + } + } + } + + protected void displayRates(List metrics) throws InterruptedException { + System.out.println("Gathering rates for: " + getName()); + for (int i = 0; i < samples; i++) { + Period p = new Period(); + Thread.sleep(period); + for (MetricCounter metric : metrics) { + System.out.println(metric.getRateSummary(p)); + metric.reset(); + } + } + } + + public int getSamples() { + return samples; + } + + public void setSamples(int samples) { + this.samples = samples; + } + + public int getPeriod() { + return period; + } + + public void setPeriod(int period) { + this.period = period; + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + +} Added: activemq/sandbox/activemq-flow/hawtdb/src/test/java/org/apache/hawtdb/internal/MapBenchmark.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/hawtdb/src/test/java/org/apache/hawtdb/internal/MapBenchmark.java?rev=825564&view=auto ============================================================================== --- activemq/sandbox/activemq-flow/hawtdb/src/test/java/org/apache/hawtdb/internal/MapBenchmark.java (added) +++ activemq/sandbox/activemq-flow/hawtdb/src/test/java/org/apache/hawtdb/internal/MapBenchmark.java Thu Oct 15 17:04:11 2009 @@ -0,0 +1,258 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hawtdb.internal; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Map; + +import org.apache.activemq.metric.MetricCounter; +import org.apache.hawtdb.internal.Benchmarker.BenchmarkAction; + +//import clojure.lang.IPersistentMap; +//import clojure.lang.PersistentHashMap; + +/** + * + * @author Hiram Chirino + */ +public class MapBenchmark { + + static interface Node { + public Node create(Map updates); + public void merge(Node node); + public Long get(Long i); + public void clear(); + } + + static class SynchornizedMapNode implements Node { + Map updates; + + public SynchornizedMapNode() { + } + + public SynchornizedMapNode(Map updates) { + this.updates = updates; + } + + public Node create(Map updates) { + return new SynchornizedMapNode(updates); + } + + public void merge(Node n) { + SynchornizedMapNode node = (SynchornizedMapNode) n; + synchronized(node) { + synchronized(this) { + node.updates.putAll(this.updates); + this.updates = node.updates; + } + } + } + + synchronized public Long get(Long i) { + return updates.get(i); + } + + synchronized public void clear() { + updates.clear(); + } + + } + +// @Test +// public void SynchornizedMapNode() throws Exception { +// nodeTest(new SynchornizedMapNode()); +// } +// +// static class ClojureMapNode implements Node { +// volatile IPersistentMap updates; +// +// public ClojureMapNode() { +// } +// +// public ClojureMapNode(Map updates) { +// this.updates = PersistentHashMap.create(updates); +// } +// +// public Node create(Map updates) { +// return new SynchornizedMapNode(updates); +// } +// +// public void merge(Node n) { +// ClojureMapNode node = (ClojureMapNode) n; +// this.updates = (IPersistentMap) node.updates.cons(this.updates); +// } +// +// synchronized public Long get(Long i) { +// return (Long) updates.valAt(i); +// } +// +// synchronized public void clear() { +// updates = PersistentHashMap.EMPTY; +// } +// +// } + +// @Test +// public void ClojureMapNode() throws Exception { +// nodeTest(new ClojureMapNode()); +// } + + public void nodeTest(final Node type) throws Exception { + final Node node = type.create(new HashMap()); + benchmark(5, new BenchmarkAction(type.getClass().getName()) { + @Override + protected void execute(MapActor actor) { + + // Test merges... + for (long i = 0; i < 100; i++) { + Map update = new HashMap(); + update.put(i, i); + node.merge(type.create(update)); + } + + // Do gets + for (long i = 0; i < 100; i++) { + node.get(i); + } + + node.clear(); + } + }); + } + + + +// +// @Test +// public void clojureMapManyReaders() throws Exception { +// IPersistentMap x = PersistentHashMap.EMPTY; +// for (long i = 0; i < 1000; i++) { +// x = x.assoc(i, i); +// } +// final IPersistentMap map = x; +// benchmark(5, new BenchmarkAction("ManyReaders:"+PersistentHashMap.class.getName()) { +// @Override +// protected void execute(MapActor actor) throws Exception { +// // Get them all.. +// for (long j = 0; j < 10; j++) { +// for (long i = 0; i < 1000; i++) { +// map.valAt(i); +// } +// } +// // Do a bunch of misses +// for (long i = 0; i < 1000; i++) { +// map.valAt(-i); +// } +// } +// }); +// } +// +// @Test +// public void javaMap() throws Exception { +// benchmark(1, new BenchmarkAction(HashMap.class.getName()) { +// @Override +// protected void execute(MapActor actor) { +// Map map = new HashMap(); +// for (long i = 0; i < 1000; i++) { +// map.put(i, i); +// } +// // Get them all.. +// for (long j = 0; j < 10; j++) { +// for (long i = 0; i < 1000; i++) { +// map.get(i); +// } +// } +// // Do a bunch of misses +// for (long i = 0; i < 1000; i++) { +// map.get(-i); +// } +// // remove 1/2 +// for (long i = 0; i < 1000; i++) { +// if ((i % 2) == 0) { +// map.remove(i); +// } +// } +// // Remove 1/2, miss 1/2 /w misses. +// for (long i = 0; i < 1000; i++) { +// map.remove(i); +// } +// } +// }); +// } +// +// @Test +// public void clojureMap() throws Exception { +// benchmark(1, new BenchmarkAction(PersistentHashMap.class.getName()) { +// @Override +// protected void execute(MapActor actor) throws Exception { +// IPersistentMap map = PersistentHashMap.EMPTY; +// for (long i = 0; i < 1000; i++) { +// map = map.assoc(i, i); +// } +// // Get them all.. +// for (long j = 0; j < 10; j++) { +// for (long i = 0; i < 1000; i++) { +// map.valAt(i); +// } +// } +// // Do a bunch of misses +// for (long i = 0; i < 1000; i++) { +// map.valAt(-i); +// } +// // remove 1/2 +// for (long i = 0; i < 1000; i++) { +// if ((i % 2) == 0) { +// map = map.without(i); +// } +// } +// // Remove 1/2, miss 1/2 /w misses. +// for (long i = 0; i < 1000; i++) { +// map = map.without(i); +// } +// } +// }); +// } +// + static class MapActor extends ActionActor { + } + + private void benchmark(int count, BenchmarkAction action) throws Exception { + Benchmarker benchmark = new Benchmarker(); + benchmark.setName(action.getName()); + ArrayList actors = createActors(count, action); + benchmark.benchmark(actors, createMetrics(action)); + } + + protected ArrayList createMetrics(BenchmarkAction action) { + ArrayList metrics = new ArrayList(); + metrics.add(action.success); + metrics.add(action.failed); + return metrics; + } + + protected ArrayList createActors(int count, Action action) { + ArrayList actors = new ArrayList(); + for (int i = 0; i < count; i++) { + MapActor actor = new MapActor(); + actor.setName("actor:"+i); + actor.setAction(action); + actors.add(actor); + } + return actors; + } +} Added: activemq/sandbox/activemq-flow/hawtdb/src/test/java/org/apache/hawtdb/internal/index/BTreeIndexBenchmark.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/hawtdb/src/test/java/org/apache/hawtdb/internal/index/BTreeIndexBenchmark.java?rev=825564&view=auto ============================================================================== --- activemq/sandbox/activemq-flow/hawtdb/src/test/java/org/apache/hawtdb/internal/index/BTreeIndexBenchmark.java (added) +++ activemq/sandbox/activemq-flow/hawtdb/src/test/java/org/apache/hawtdb/internal/index/BTreeIndexBenchmark.java Thu Oct 15 17:04:11 2009 @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hawtdb.internal.index; + +import org.apache.activemq.util.buffer.Buffer; +import org.apache.activemq.util.marshaller.FixedBufferMarshaller; +import org.apache.activemq.util.marshaller.LongMarshaller; +import org.apache.hawtdb.api.Index; +import org.apache.hawtdb.api.Transaction; +import org.apache.hawtdb.internal.index.BTreeIndex; +import org.apache.hawtdb.internal.index.BTreeIndex.Factory; + +/** + * + * @author Hiram Chirino + */ +public class BTreeIndexBenchmark extends IndexBenchmark { + + protected Index createIndex(Transaction tx) { + Factory factory = new BTreeIndex.Factory(); + factory.setKeyMarshaller(LongMarshaller.INSTANCE); + factory.setValueMarshaller(new FixedBufferMarshaller(DATA.length)); + return factory.open(tx, tx.allocator().alloc(1)); + } + +} Added: activemq/sandbox/activemq-flow/hawtdb/src/test/java/org/apache/hawtdb/internal/index/BTreeIndexTest.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/hawtdb/src/test/java/org/apache/hawtdb/internal/index/BTreeIndexTest.java?rev=825564&view=auto ============================================================================== --- activemq/sandbox/activemq-flow/hawtdb/src/test/java/org/apache/hawtdb/internal/index/BTreeIndexTest.java (added) +++ activemq/sandbox/activemq-flow/hawtdb/src/test/java/org/apache/hawtdb/internal/index/BTreeIndexTest.java Thu Oct 15 17:04:11 2009 @@ -0,0 +1,183 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hawtdb.internal.index; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.PrintWriter; +import java.text.NumberFormat; +import java.util.List; +import java.util.Map; + +import org.apache.activemq.util.marshaller.LongMarshaller; +import org.apache.activemq.util.marshaller.StringMarshaller; +import org.apache.hawtdb.api.IndexVisitor; +import org.apache.hawtdb.api.Index; +import org.apache.hawtdb.internal.index.BTreeIndex; +import org.apache.hawtdb.internal.index.BTreeIndex.Factory; +import org.junit.Before; +import org.junit.Test; + +/** + * + * @author Hiram Chirino + */ +public class BTreeIndexTest extends IndexTestSupport { + + private NumberFormat nf; + + @Before + protected void setUp() throws Exception { + nf = NumberFormat.getIntegerInstance(); + nf.setMinimumIntegerDigits(6); + nf.setGroupingUsed(false); + } + + @Override + protected Index createIndex(int page) { + Factory factory = new Factory(); + factory.setKeyMarshaller(StringMarshaller.INSTANCE); + factory.setValueMarshaller(LongMarshaller.INSTANCE); + if( page==-1 ) { + return factory.create(tx, tx.allocator().alloc(1)); + } else { + return factory.open(tx, page); + } + } + + /** + * Yeah, the current implementation does NOT try to balance the tree. Here is + * a test case showing that it gets out of balance. + * + * @throws Exception + */ + public void disabled_testTreeBalancing() throws Exception { + createPageFileAndIndex((short) 100); + + BTreeIndex index = ((BTreeIndex)this.index); + + doInsert(50); + + int minLeafDepth = index.getMinLeafDepth(); + int maxLeafDepth = index.getMaxLeafDepth(); + assertTrue("Tree is balanced", maxLeafDepth-minLeafDepth <= 1); + + // Remove some of the data + doRemove(16); + minLeafDepth = index.getMinLeafDepth(); + maxLeafDepth = index.getMaxLeafDepth(); + + System.out.println( "min:"+minLeafDepth ); + System.out.println( "max:"+maxLeafDepth ); + index.printStructure(new PrintWriter(System.out)); + + assertTrue("Tree is balanced", maxLeafDepth-minLeafDepth <= 1); + + tx.commit(); + } + + @Test + public void testPruning() throws Exception { + createPageFileAndIndex((short)100); + + BTreeIndex index = ((BTreeIndex)this.index); + + int minLeafDepth = index.getMinLeafDepth(); + int maxLeafDepth = index.getMaxLeafDepth(); + assertEquals(1, minLeafDepth); + assertEquals(1, maxLeafDepth); + + doInsert(1000); + + minLeafDepth = index.getMinLeafDepth(); + maxLeafDepth = index.getMaxLeafDepth(); + assertTrue("Depth of tree grew", minLeafDepth > 1); + assertTrue("Depth of tree grew", maxLeafDepth > 1); + + // Remove the data. + doRemove(1000); + minLeafDepth = index.getMinLeafDepth(); + maxLeafDepth = index.getMaxLeafDepth(); + + assertEquals(1, minLeafDepth); + assertEquals(1, maxLeafDepth); + } + + @Test + public void testIteration() throws Exception { + createPageFileAndIndex((short)100); + + BTreeIndex index = ((BTreeIndex)this.index); + + // Insert in reverse order.. + doInsertReverse(1000); + + reloadIndex(); + tx.commit(); + + // BTree should iterate it in sorted order. + int counter=0; + for (Map.Entry entry : index) { + assertEquals(key(counter),entry.getKey()); + assertEquals(counter,(long)entry.getValue()); + counter++; + } + } + + + @Test + public void testVisitor() throws Exception { + createPageFileAndIndex((short)100); + BTreeIndex index = ((BTreeIndex)this.index); + + // Insert in reverse order.. + doInsert(1000); + + reloadIndex(); + tx.commit(); + + // BTree should iterate it in sorted order. + + index.visit(new IndexVisitor(){ + public boolean isInterestedInKeysBetween(String first, String second) { + return true; + } + public void visit(List keys, List values) { + } + public boolean isSatiated() { + return false; + } + }); + + } + + void doInsertReverse(int count) throws Exception { + for (int i = count-1; i >= 0; i--) { + index.put(key(i), (long)i); + tx.commit(); + } + } + /** + * Overriding so that this generates keys that are the worst case for the BTree. Keys that + * always insert to the end of the BTree. + */ + @Override + protected String key(int i) { + return "key:"+nf.format(i); + } +} Added: activemq/sandbox/activemq-flow/hawtdb/src/test/java/org/apache/hawtdb/internal/index/HashIndexBenchmark.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/hawtdb/src/test/java/org/apache/hawtdb/internal/index/HashIndexBenchmark.java?rev=825564&view=auto ============================================================================== --- activemq/sandbox/activemq-flow/hawtdb/src/test/java/org/apache/hawtdb/internal/index/HashIndexBenchmark.java (added) +++ activemq/sandbox/activemq-flow/hawtdb/src/test/java/org/apache/hawtdb/internal/index/HashIndexBenchmark.java Thu Oct 15 17:04:11 2009 @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hawtdb.internal.index; + +import org.apache.activemq.util.buffer.Buffer; +import org.apache.activemq.util.marshaller.FixedBufferMarshaller; +import org.apache.activemq.util.marshaller.LongMarshaller; +import org.apache.hawtdb.api.Index; +import org.apache.hawtdb.api.Transaction; +import org.apache.hawtdb.internal.index.HashIndex.Factory; + + +/** + * + * @author Hiram Chirino + */ +public class HashIndexBenchmark extends IndexBenchmark { + + protected Index createIndex(Transaction tx) { + Factory factory = new Factory(); + factory.setKeyMarshaller(LongMarshaller.INSTANCE); + factory.setValueMarshaller(new FixedBufferMarshaller(DATA.length)); + return factory.open(tx, tx.allocator().alloc(1)); + } + +} Added: activemq/sandbox/activemq-flow/hawtdb/src/test/java/org/apache/hawtdb/internal/index/HashIndexTest.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/hawtdb/src/test/java/org/apache/hawtdb/internal/index/HashIndexTest.java?rev=825564&view=auto ============================================================================== --- activemq/sandbox/activemq-flow/hawtdb/src/test/java/org/apache/hawtdb/internal/index/HashIndexTest.java (added) +++ activemq/sandbox/activemq-flow/hawtdb/src/test/java/org/apache/hawtdb/internal/index/HashIndexTest.java Thu Oct 15 17:04:11 2009 @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hawtdb.internal.index; + +import org.apache.activemq.util.marshaller.LongMarshaller; +import org.apache.activemq.util.marshaller.StringMarshaller; +import org.apache.hawtdb.api.Index; +import org.apache.hawtdb.internal.index.HashIndex.Factory; + + +/** + * + * @author Hiram Chirino + */ +public class HashIndexTest extends IndexTestSupport { + + @Override + protected Index createIndex(int page) { + Factory factory = new Factory(); + factory.setKeyMarshaller(StringMarshaller.INSTANCE); + factory.setValueMarshaller(LongMarshaller.INSTANCE); + if( page==-1 ) { + return factory.create(tx, tx.allocator().alloc(1)); + } else { + return factory.open(tx, page); + } + } + +} Added: activemq/sandbox/activemq-flow/hawtdb/src/test/java/org/apache/hawtdb/internal/index/IndexBenchmark.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/hawtdb/src/test/java/org/apache/hawtdb/internal/index/IndexBenchmark.java?rev=825564&view=auto ============================================================================== --- activemq/sandbox/activemq-flow/hawtdb/src/test/java/org/apache/hawtdb/internal/index/IndexBenchmark.java (added) +++ activemq/sandbox/activemq-flow/hawtdb/src/test/java/org/apache/hawtdb/internal/index/IndexBenchmark.java Thu Oct 15 17:04:11 2009 @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hawtdb.internal.index; + +import java.util.Random; + +import org.apache.activemq.util.buffer.Buffer; +import org.apache.hawtdb.api.Index; +import org.apache.hawtdb.api.Transaction; +import org.apache.hawtdb.internal.Action; +import org.apache.hawtdb.internal.Benchmarker.BenchmarkAction; +import org.apache.hawtdb.internal.page.ConcurrentPageFile; +import org.apache.hawtdb.internal.page.TransactionActor; +import org.apache.hawtdb.internal.page.TransactionBenchmarker; +import org.junit.Test; + +/** + * + * @author Hiram Chirino + */ +public abstract class IndexBenchmark { + + static final public byte[] DATA = new byte[8]; + + class IndexActor extends TransactionActor { + public Random random; + public Index index; + + public void setName(String name) { + super.setName(name); + this.random = new Random(name.hashCode()); + } + + @Override + public void setTx(Transaction tx) { + super.setTx(tx); + index = createIndex(tx); + } + } + + TransactionBenchmarker benchmark = new TransactionBenchmarker() { + protected IndexActor createActor(ConcurrentPageFile pageFile, Action action, int i) { + return new IndexActor(); + }; + }; + + + @Test + public void insert() throws Exception { + benchmark.benchmark(1, new BenchmarkAction("insert") { + long counter=0; + @Override + protected void execute(IndexActor actor) { + actor.index.put(counter++, new Buffer(DATA)); + } + }); + } + + + abstract protected Index createIndex(Transaction tx); + +} Added: activemq/sandbox/activemq-flow/hawtdb/src/test/java/org/apache/hawtdb/internal/index/IndexTestSupport.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/hawtdb/src/test/java/org/apache/hawtdb/internal/index/IndexTestSupport.java?rev=825564&view=auto ============================================================================== --- activemq/sandbox/activemq-flow/hawtdb/src/test/java/org/apache/hawtdb/internal/index/IndexTestSupport.java (added) +++ activemq/sandbox/activemq-flow/hawtdb/src/test/java/org/apache/hawtdb/internal/index/IndexTestSupport.java Thu Oct 15 17:04:11 2009 @@ -0,0 +1,167 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hawtdb.internal.index; + +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; + +import java.io.File; +import java.io.IOException; + +import org.apache.hawtdb.api.Index; +import org.apache.hawtdb.api.Transaction; +import org.apache.hawtdb.internal.page.ConcurrentPageFile; +import org.apache.hawtdb.internal.page.ConcurrentPageFileFactory; +import org.junit.After; +import org.junit.Test; + + +/** + * Tests an Index + * + * @author Hiram Chirino + */ +public abstract class IndexTestSupport { + + private ConcurrentPageFileFactory pff; + private ConcurrentPageFile pf; + protected Index index; + protected Transaction tx; + + + protected ConcurrentPageFileFactory createConcurrentPageFileFactory() { + ConcurrentPageFileFactory rc = new ConcurrentPageFileFactory(); + rc.setFile(new File("target/test-data/" + getClass().getName() + ".db")); + return rc; + } + + @After + protected void tearDown() throws Exception { + if( pf!=null ) { + pff.close(); + pff = null; + } + } + + abstract protected Index createIndex(int page); + + private static final int COUNT = 10000; + + public void createPageFileAndIndex(short pageSize) throws Exception { + pff = createConcurrentPageFileFactory(); + pff.setPageSize(pageSize); + pff.getFile().delete(); + pff.open(); + pf = pff.getConcurrentPageFile(); + tx = pf.tx(); + index = createIndex(-1); + + } + + protected void reloadAll() { + int page = index.getPage(); + pff.close(); + pff.open(); + pf = pff.getConcurrentPageFile(); + tx = pf.tx(); + index = createIndex(page); + } + + protected void reloadIndex() { + int page = index.getPage(); + tx.commit(); + index = createIndex(page); + } + + @Test + public void testIndexOperations() throws Exception { + createPageFileAndIndex((short) 500); + reloadIndex(); + doInsert(COUNT); + reloadIndex(); + tx.commit(); + checkRetrieve(COUNT); + doRemove(COUNT); + reloadIndex(); + tx.commit(); + doInsert(COUNT); + doRemoveHalf(COUNT); + doInsertHalf(COUNT); + reloadIndex(); + tx.commit(); + checkRetrieve(COUNT); + } + + void doInsert(int count) throws Exception { + for (int i = 0; i < count; i++) { + index.put(key(i), (long)i); + } + tx.commit(); + } + + protected String key(int i) { + return "key:"+i; + } + + void checkRetrieve(int count) throws IOException { + for (int i = 0; i < count; i++) { + Long item = index.get(key(i)); + assertNotNull("Key missing: "+key(i), item); + } + } + + void doRemoveHalf(int count) throws Exception { + for (int i = 0; i < count; i++) { + if (i % 2 == 0) { + assertNotNull("Expected remove to return value for index "+i, index.remove(key(i))); + } + } + tx.commit(); + } + + void doInsertHalf(int count) throws Exception { + for (int i = 0; i < count; i++) { + if (i % 2 == 0) { + index.put(key(i), (long)i); + } + } + tx.commit(); + } + + void doRemove(int count) throws Exception { + for (int i = 0; i < count; i++) { + assertNotNull("Expected remove to return value for index "+i, index.remove(key(i))); + } + tx.commit(); + for (int i = 0; i < count; i++) { + Long item = index.get(key(i)); + assertNull(item); + } + } + + void doRemoveBackwards(int count) throws Exception { + for (int i = count - 1; i >= 0; i--) { + index.remove(key(i)); + } + tx.commit(); + for (int i = 0; i < count; i++) { + Long item = index.get(key(i)); + assertNull(item); + } + } + +} Added: activemq/sandbox/activemq-flow/hawtdb/src/test/java/org/apache/hawtdb/internal/io/MemoryMappedFileTest.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/hawtdb/src/test/java/org/apache/hawtdb/internal/io/MemoryMappedFileTest.java?rev=825564&view=auto ============================================================================== --- activemq/sandbox/activemq-flow/hawtdb/src/test/java/org/apache/hawtdb/internal/io/MemoryMappedFileTest.java (added) +++ activemq/sandbox/activemq-flow/hawtdb/src/test/java/org/apache/hawtdb/internal/io/MemoryMappedFileTest.java Thu Oct 15 17:04:11 2009 @@ -0,0 +1,73 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hawtdb.internal.io; + +import java.io.File; +import java.io.IOException; + +import org.apache.hawtdb.internal.io.MemoryMappedFile; +import org.junit.Assert; + + +/** + * @author Hiram Chirino + */ +public class MemoryMappedFileTest { + + @org.junit.Test + public void basicOps() throws IOException { + File file = new File("target/foo.data"); + file.delete(); + + MemoryMappedFile mmf = new MemoryMappedFile(file, 1024*1024*100); + + int PAGE_SIZE = 1024*4; + int LAST_PAGE = 100; + + byte expect[] = createData(PAGE_SIZE); + + mmf.write(0, expect); + mmf.write(LAST_PAGE *PAGE_SIZE, expect); + + // Validate data on the first page. + byte actual[] = new byte[PAGE_SIZE]; + mmf.read(0, actual); + Assert.assertEquals('a', actual[0]); + Assert.assertEquals('a', actual[26]); + Assert.assertEquals('z', actual[26+25]); + + // Validate data on the 3rd page. + actual = new byte[PAGE_SIZE]; + mmf.read(PAGE_SIZE*LAST_PAGE, actual); + Assert.assertEquals('a', actual[0]); + Assert.assertEquals('a', actual[26]); + Assert.assertEquals('z', actual[26+25]); + + mmf.sync(); + mmf.close(); + + } + + private byte[] createData(int size) { + byte[] rc = new byte[size]; + for (int i = 0; i < rc.length; i++) { + rc[i] = (byte) ('a'+(i%26)); + } + return rc; + } + +} \ No newline at end of file Added: activemq/sandbox/activemq-flow/hawtdb/src/test/java/org/apache/hawtdb/internal/journal/JournalTest.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/hawtdb/src/test/java/org/apache/hawtdb/internal/journal/JournalTest.java?rev=825564&view=auto ============================================================================== --- activemq/sandbox/activemq-flow/hawtdb/src/test/java/org/apache/hawtdb/internal/journal/JournalTest.java (added) +++ activemq/sandbox/activemq-flow/hawtdb/src/test/java/org/apache/hawtdb/internal/journal/JournalTest.java Thu Oct 15 17:04:11 2009 @@ -0,0 +1,139 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hawtdb.internal.journal; + +import java.io.File; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import junit.framework.TestCase; + +import org.apache.activemq.util.buffer.Buffer; +import org.apache.hawtdb.internal.journal.Journal; + +/** + * + * @author Hiram Chirino + */ +public class JournalTest extends TestCase { + protected static final int DEFAULT_MAX_BATCH_SIZE = 1024 * 1024 * 4; + + Journal dataManager; + File dir; + + @Override + public void setUp() throws Exception { + dir = new File("target/tests/DataFileAppenderTest"); + dir.mkdirs(); + dataManager = new Journal(); + dataManager.setDirectory(dir); + configure(dataManager); + dataManager.start(); + } + + protected void configure(Journal dataManager) { + } + + @Override + public void tearDown() throws Exception { + dataManager.close(); + deleteFilesInDirectory(dir); + dir.delete(); + } + + private void deleteFilesInDirectory(File directory) { + File[] files = directory.listFiles(); + for (int i=0; i= 0 + // as the Thread created in DataFileAppender.enqueue() may not have caught up. + assertTrue("queued data is written", latch.await(5, TimeUnit.SECONDS)); + } + + public void testBatchWriteCallbackCompleteAfterClose() throws Exception { + final int iterations = 10; + final CountDownLatch latch = new CountDownLatch(iterations); + Buffer data = new Buffer("DATA".getBytes()); + for (int i=0; iHiram Chirino + */ +public class ConcurrentPageFileTest { + + private ConcurrentPageFileFactory pff; + + private ConcurrentPageFile pf; + + protected ConcurrentPageFileFactory createConcurrentPageFileFactory() { + ConcurrentPageFileFactory rc = new ConcurrentPageFileFactory(); + rc.setFile(new File("target/test-data/" + getClass().getName() + ".db")); + return rc; + } + + @Before + public void setUp() throws Exception { + pff = createConcurrentPageFileFactory(); + pff.getFile().delete(); + pff.open(); + pf = pff.getConcurrentPageFile(); + } + + @After + public void tearDown() throws Exception { + pff.close(); + } + + protected void reload() { + pff.close(); + pff.open(); + pf = pff.getConcurrentPageFile(); + } + + protected int store(Paged tx, String value) throws IOException { + int pageId = tx.allocator().alloc(1); + store(tx, pageId, value); + return pageId; + } + + protected void store(Paged tx, int page, String value) { + try { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutputStream os = new DataOutputStream(baos); + os.writeUTF(value); + os.close(); + tx.write(page, new Buffer(baos.toByteArray())); + } catch (IOException e) { + throw new IOPagingException(e); + } + } + + protected String load(Paged paged, int page) { + try { + Buffer buffer = new Buffer(pff.getPageSize()); + paged.read(page, buffer); + ByteArrayInputStream bais = new ByteArrayInputStream(buffer.data, buffer.offset, buffer.length); + DataInputStream is = new DataInputStream(bais); + return is.readUTF(); + } catch (IOException e) { + throw new IOPagingException(e); + } + } + + private final class StringEncoderDecoder implements EncoderDecoder { + public String load(Paged paged, int page) { + return ConcurrentPageFileTest.this.load(paged, page); + } + public List store(Paged paged, int page, String value) { + ConcurrentPageFileTest.this.store(paged, page, value); + return Collections.emptyList(); + } + public void remove(Paged paged, int page) { + } + } + + @Test + public void cacheAPI() throws IOException, ClassNotFoundException { + + // Setup some pages that will be getting updated. + Transaction tx = pf.tx(); + StringEncoderDecoder ENCODER = new StringEncoderDecoder(); + tx.put(ENCODER, tx.allocator().alloc(1), "Hello"); + tx.put(ENCODER, tx.allocator().alloc(1), "World"); + tx.commit(); + + reload(); + tx = pf.tx(); + + assertEquals("Hello", tx.get(ENCODER, 0)); + assertEquals("World", tx.get(ENCODER, 1)); + + } + + + @Test + public void cacheAPIConflictingUpdateFails() throws IOException, ClassNotFoundException { + + // Setup some pages that will be getting updated. + Transaction tx1 = pf.tx(); + StringEncoderDecoder ENCODER = new StringEncoderDecoder(); + tx1.put(ENCODER, tx1.allocator().alloc(1), "Hello"); + tx1.put(ENCODER, tx1.allocator().alloc(1), "World"); + tx1.commit(); + + tx1.put(ENCODER, 0, "Change 1"); + + // Now commit a change to page 0 + Transaction tx2 = pf.tx(); + assertEquals("Hello", tx2.get(ENCODER, 0)); // We don't see tx1's change... + tx2.put(ENCODER, 0, "Change 2"); + assertEquals("Change 2", tx2.get(ENCODER, 0)); // We can see our own change.. + tx2.commit(); + + // Tx1 still does not see tx2's change... + assertEquals("Change 1", tx1.get(ENCODER, 0)); + + try { + tx1.commit(); + fail("expected OptimisticUpdateException"); + } catch (OptimisticUpdateException expected) { + } + + } + + @Test + public void conflictingUpdateFails() throws IOException, ClassNotFoundException { + + // Setup some pages that will be getting updated. + Transaction tx1 = pf.tx(); + assertEquals(0, store(tx1, "Hello")); + assertEquals(1, store(tx1, "World")); + tx1.commit(); + + // Start a transaction that updates page 0 + tx1 = pf.tx(); + store(tx1, 0, "Change 1"); + + // Now commit a change to page 0 + Transaction tx2 = pf.tx(); + assertEquals("Hello", load(tx2, 0)); // We don't see tx1's change... + store(tx2, 0, "Change 2"); + assertEquals("Change 2", load(tx2, 0)); // We can see our own change.. + tx2.commit(); + + // Tx1 still does not see tx2's change... + assertEquals("Change 1", load(tx1, 0)); + + try { + tx1.commit(); + fail("expected OptimisticUpdateException"); + } catch (OptimisticUpdateException expected) { + } + + } + + @Test + public void pagesNotDirectlyUpdated() throws IOException, ClassNotFoundException { + // New allocations get stored in the final positions. + Transaction tx = pf.tx(); + assertEquals(0, store(tx, "Hello")); + assertEquals(1, store(tx, "World")); + + // It should be on the page file already.. + assertEquals("Hello", load(pff.getPageFile(), 0)); + assertEquals("World", load(pff.getPageFile(), 1)); + tx.commit(); + + // Apply the updates. + pf.flush(); + pf.applyRedos(); + + // Should still be there.. + assertEquals("Hello", load(pff.getPageFile(), 0)); + assertEquals("World", load(pff.getPageFile(), 1)); + + // Update the existing pages.. + store(tx, 0, "Good"); + store(tx, 1, "Bye"); + tx.commit(); + + // A subsequent transaction can read the update. + assertEquals("Good", load(tx, 0)); + assertEquals("Bye", load(tx, 1)); + tx.commit(); + + // But the pages are should not be updated until the transaction gets + // applied. + assertEquals("Hello", load(pff.getPageFile(), 0)); + assertEquals("World", load(pff.getPageFile(), 1)); + + // Apply them + pf.flush(); + pf.applyRedos(); + + // We should see them now. + assertEquals("Good", load(pff.getPageFile(), 0)); + assertEquals("Bye", load(pff.getPageFile(), 1)); + } + + @Test + public void crudOperations() throws IOException, ClassNotFoundException { + int COUNT = 10; + + ArrayList allocations = new ArrayList(); + HashSet expected = new HashSet(); + + // Insert some data into the page file. + Transaction tx = pf.tx(); + for (int i = 0; i < COUNT; i++) { + + int page = tx.allocator().alloc(1); + // Since the file is empty.. allocations should occur sequentially + assertEquals(i, page); + + allocations.add(page); + String value = "page:" + i; + store(tx, page, value); + expected.add(value); + tx.commit(); + } + + // Reload it.. . + reload(); + tx = pf.tx(); + + // Iterate it to make sure they are still there.. + HashSet actual = new HashSet(); + for (Integer page : allocations) { + actual.add((String) load(tx, page)); + } + assertEquals(expected, actual); + + // Remove the odd records.. + for (int i = 0; i < COUNT; i++) { + if (i % 2 == 0) { + break; + } + String t = "page:" + i; + expected.remove(t); + } + for (Integer page : new ArrayList(allocations)) { + String t = (String) load(tx, page); + if (!expected.contains(t)) { + tx.allocator().free(page, 1); + allocations.remove(page); + } + } + tx.commit(); + + // Reload it... + reload(); + tx = pf.tx(); + + // Iterate it to make sure the even records are still there.. + actual.clear(); + for (Integer page : allocations) { + String t = (String) load(tx, page); + actual.add(t); + } + assertEquals(expected, actual); + + // Update the records... + HashSet t = expected; + expected = new HashSet(); + for (String s : t) { + expected.add(s + ":updated"); + } + for (Integer page : allocations) { + String value = (String) load(tx, page); + store(tx, page, value + ":updated"); + } + tx.commit(); + + // Reload it... + reload(); + tx = pf.tx(); + + // Iterate it to make sure the updated records are still there.. + actual.clear(); + for (Integer page : allocations) { + String value = (String) load(tx, page); + actual.add(value); + } + assertEquals(expected, actual); + + } + + @Test + public void testExtentStreams() throws IOException { + Transaction tx = pf.tx(); + ExtentOutputStream eos = new ExtentOutputStream(tx); + DataOutputStream os = new DataOutputStream(eos); + for (int i = 0; i < 10000; i++) { + os.writeUTF("Test string:" + i); + } + os.close(); + int page = eos.getPage(); + tx.commit(); + + // Reload the page file. + reload(); + tx = pf.tx(); + + ExtentInputStream eis = new ExtentInputStream(tx, page); + DataInputStream is = new DataInputStream(eis); + for (int i = 0; i < 10000; i++) { + assertEquals("Test string:" + i, is.readUTF()); + } + assertEquals(-1, is.read()); + is.close(); + } + + @Test + public void testAddRollback() throws IOException, ClassNotFoundException { + + HashSet expected = new HashSet(); + + // Insert some data into the page file. + Transaction tx = pf.tx(); + for (int i = 0; i < 100; i++) { + String t = "page:" + i; + int pageId = store(tx, t); + + // Rollback every other insert. + if (i % 2 == 0) { + // Rolled back back tx's should have their allocated pages + // released.. + assertEquals(pageId, i / 2); + expected.add(t); + tx.commit(); + } else { + tx.rollback(); + } + + } + + // Reload it... + reload(); + tx = pf.tx(); + + // Iterate it to make sure they are still there.. + HashSet actual = new HashSet(); + for (int i = 0; i < 100; i++) { + if (i % 2 == 0) { + String t = load(tx, i / 2); + actual.add(t); + } + } + assertEquals(expected, actual); + } +} Added: activemq/sandbox/activemq-flow/hawtdb/src/test/java/org/apache/hawtdb/internal/page/ExtentTest.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/hawtdb/src/test/java/org/apache/hawtdb/internal/page/ExtentTest.java?rev=825564&view=auto ============================================================================== --- activemq/sandbox/activemq-flow/hawtdb/src/test/java/org/apache/hawtdb/internal/page/ExtentTest.java (added) +++ activemq/sandbox/activemq-flow/hawtdb/src/test/java/org/apache/hawtdb/internal/page/ExtentTest.java Thu Oct 15 17:04:11 2009 @@ -0,0 +1,93 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hawtdb.internal.page; + +import static org.junit.Assert.assertEquals; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.File; +import java.io.IOException; + +import org.apache.hawtdb.internal.page.ExtentInputStream; +import org.apache.hawtdb.internal.page.ExtentOutputStream; +import org.apache.hawtdb.internal.page.PageFile; +import org.apache.hawtdb.internal.page.PageFileFactory; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +/** + * + * @author Hiram Chirino + */ +public class ExtentTest { + + private PageFileFactory pff; + private PageFile paged; + + protected PageFileFactory createPageFileFactory() { + PageFileFactory rc = new PageFileFactory(); + rc.setMappingSegementSize(rc.getPageSize()*3); + rc.setFile(new File("target/test-data/"+getClass().getName()+".db")); + return rc; + } + + @Before + public void setUp() throws Exception { + pff = createPageFileFactory(); + pff.getFile().delete(); + pff.open(); + paged = pff.getPageFile(); + } + + @After + public void tearDown() throws Exception { + pff.close(); + } + + protected void reload() { + pff.close(); + pff.open(); + paged = pff.getPageFile(); + } + + + @Test + public void testExtentStreams() throws IOException { + ExtentOutputStream eos = new ExtentOutputStream(paged); + DataOutputStream os = new DataOutputStream(eos); + for (int i = 0; i < 10000; i++) { + os.writeUTF("Test string:" + i); + } + os.close(); + int page = eos.getPage(); + + assertEquals(0, page); + + // Reload the page file. + reload(); + + ExtentInputStream eis = new ExtentInputStream(paged, page); + DataInputStream is = new DataInputStream(eis); + for (int i = 0; i < 10000; i++) { + assertEquals("Test string:" + i, is.readUTF()); + } + assertEquals(-1, is.read()); + is.close(); + } +} Added: activemq/sandbox/activemq-flow/hawtdb/src/test/java/org/apache/hawtdb/internal/page/TransactionActor.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/hawtdb/src/test/java/org/apache/hawtdb/internal/page/TransactionActor.java?rev=825564&view=auto ============================================================================== --- activemq/sandbox/activemq-flow/hawtdb/src/test/java/org/apache/hawtdb/internal/page/TransactionActor.java (added) +++ activemq/sandbox/activemq-flow/hawtdb/src/test/java/org/apache/hawtdb/internal/page/TransactionActor.java Thu Oct 15 17:04:11 2009 @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hawtdb.internal.page; + +import org.apache.hawtdb.api.Transaction; +import org.apache.hawtdb.internal.ActionActor; + +/** + * + * @author Hiram Chirino + */ +public class TransactionActor> extends ActionActor { + private Transaction tx; + + public void setTx(Transaction tx) { + this.tx = tx; + } + + public Transaction tx() { + return tx; + } +} \ No newline at end of file Added: activemq/sandbox/activemq-flow/hawtdb/src/test/java/org/apache/hawtdb/internal/page/TransactionBenchmark.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/hawtdb/src/test/java/org/apache/hawtdb/internal/page/TransactionBenchmark.java?rev=825564&view=auto ============================================================================== --- activemq/sandbox/activemq-flow/hawtdb/src/test/java/org/apache/hawtdb/internal/page/TransactionBenchmark.java (added) +++ activemq/sandbox/activemq-flow/hawtdb/src/test/java/org/apache/hawtdb/internal/page/TransactionBenchmark.java Thu Oct 15 17:04:11 2009 @@ -0,0 +1,107 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hawtdb.internal.page; + +import java.util.Random; + +import org.apache.activemq.util.buffer.Buffer; +import org.apache.hawtdb.api.Transaction; +import org.apache.hawtdb.internal.Action; +import org.apache.hawtdb.internal.Benchmarker.BenchmarkAction; +import org.apache.hawtdb.internal.page.ConcurrentPageFile; +import org.apache.hawtdb.internal.page.ConcurrentPageFileFactory; +import org.apache.hawtdb.internal.page.TransactionBenchmarker.Callback; +import org.junit.Test; + +/** + * + * @author Hiram Chirino + */ +public class TransactionBenchmark { + + static private byte[] THE_DATA = new byte[1024 * 3]; + + static class RandomTxActor extends TransactionActor { + public Random random; + public void setName(String name) { + super.setName(name); + this.random = new Random(name.hashCode()); + } + } + + TransactionBenchmarker benchmark = new TransactionBenchmarker() { + protected RandomTxActor createActor(ConcurrentPageFile pageFile, Action action, int i) { + return new RandomTxActor(); + }; + }; + +// @Test +// public void append() throws Exception { +// benchmark.benchmark(1, new BenchmarkAction("append") { +// @Override +// protected void execute(RandomTxActor actor) { +// int page = actor.tx().allocator().alloc(1); +// actor.tx().write(page, new Buffer(THE_DATA)); +// actor.tx().commit(); +// } +// }); +// } + + @Test + public void update() throws Exception { + final int INITIAL_PAGE_COUNT = 1024 * 100; + preallocate(INITIAL_PAGE_COUNT); + benchmark.benchmark(1, new BenchmarkAction("update") { + @Override + protected void execute(RandomTxActor actor) { + int page = actor.random.nextInt(INITIAL_PAGE_COUNT); + actor.tx().write(page, new Buffer(THE_DATA)); + actor.tx().commit(); + } + }); + } + + + @Test + public void read() throws Exception { + final int INITIAL_PAGE_COUNT = 1024 * 100; + preallocate(INITIAL_PAGE_COUNT); + benchmark.benchmark(1, new BenchmarkAction("read") { + @Override + protected void execute(RandomTxActor actor) { + int page = actor.random.nextInt(INITIAL_PAGE_COUNT); + actor.tx().read(page, new Buffer(THE_DATA)); + actor.tx().commit(); + } + }); + } + + + private void preallocate(final int INITIAL_PAGE_COUNT) { + benchmark.setSetup(new Callback(){ + public void run(ConcurrentPageFileFactory pff) throws Exception { + Transaction tx = pff.getConcurrentPageFile().tx(); + for (int i = 0; i < INITIAL_PAGE_COUNT; i++) { + int page = tx.allocator().alloc(1); + tx.write(page, new Buffer(THE_DATA)); + } + tx.commit(); + } + }); + } + +} Added: activemq/sandbox/activemq-flow/hawtdb/src/test/java/org/apache/hawtdb/internal/page/TransactionBenchmarker.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/hawtdb/src/test/java/org/apache/hawtdb/internal/page/TransactionBenchmarker.java?rev=825564&view=auto ============================================================================== --- activemq/sandbox/activemq-flow/hawtdb/src/test/java/org/apache/hawtdb/internal/page/TransactionBenchmarker.java (added) +++ activemq/sandbox/activemq-flow/hawtdb/src/test/java/org/apache/hawtdb/internal/page/TransactionBenchmarker.java Thu Oct 15 17:04:11 2009 @@ -0,0 +1,108 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hawtdb.internal.page; + +import java.io.File; +import java.util.ArrayList; + +import org.apache.activemq.metric.MetricCounter; +import org.apache.hawtdb.internal.Action; +import org.apache.hawtdb.internal.Benchmarker; +import org.apache.hawtdb.internal.Benchmarker.BenchmarkAction; +import org.apache.hawtdb.internal.page.ConcurrentPageFile; +import org.apache.hawtdb.internal.page.ConcurrentPageFileFactory; + +/** + * + * @author Hiram Chirino + */ +public class TransactionBenchmarker> { + + public interface Callback { + public void run(ConcurrentPageFileFactory pff) throws Exception; + } + + private Callback setup; + private Callback tearDown; + + public void benchmark(int actorCount, BenchmarkAction action) throws Exception { + ConcurrentPageFileFactory pff = new ConcurrentPageFileFactory(); + pff.setFile(new File("target/test-data/" + getClass().getName() + ".db")); + pff.getFile().delete(); + pff.open(); + try { + if( setup!=null ) { + setup.run(pff); + } + ConcurrentPageFile pf = pff.getConcurrentPageFile(); + Benchmarker benchmark = new Benchmarker(); + benchmark.setName(action.getName()); + ArrayList actors = createActors(pf, actorCount, action); + benchmark.benchmark(actors, createMetrics(action)); + } finally { + try { + if( tearDown!=null ) { + tearDown.run(pff); + } + } finally { + pff.close(); + } + } + } + + protected ArrayList createMetrics(BenchmarkAction action) { + ArrayList metrics = new ArrayList(); + metrics.add(action.success); + metrics.add(action.failed); + return metrics; + } + + protected ArrayList createActors(ConcurrentPageFile pageFile, int count, Action action) { + ArrayList actors = new ArrayList(); + for (int i = 0; i < count; i++) { + A actor = createActor(pageFile, action, i); + actor.setName("actor:"+i); + actor.setAction(action); + actor.setTx(pageFile.tx()); + actors.add(actor); + } + return actors; + } + + @SuppressWarnings("unchecked") + protected A createActor(ConcurrentPageFile pageFile, Action action, int i) { + return (A) new TransactionActor(); + } + + public Callback getSetup() { + return setup; + } + + public void setSetup(Callback setup) { + this.setup = setup; + } + + public Callback getTearDown() { + return tearDown; + } + + public void setTearDown(Callback tearDown) { + this.tearDown = tearDown; + } + + +} Added: activemq/sandbox/activemq-flow/hawtdb/src/test/java/org/apache/hawtdb/internal/util/RangesTest.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/hawtdb/src/test/java/org/apache/hawtdb/internal/util/RangesTest.java?rev=825564&view=auto ============================================================================== --- activemq/sandbox/activemq-flow/hawtdb/src/test/java/org/apache/hawtdb/internal/util/RangesTest.java (added) +++ activemq/sandbox/activemq-flow/hawtdb/src/test/java/org/apache/hawtdb/internal/util/RangesTest.java Thu Oct 15 17:04:11 2009 @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hawtdb.internal.util; + +import static org.apache.hawtdb.internal.util.Ranges.range; +import static org.junit.Assert.assertEquals; + +import java.util.ArrayList; + +import org.apache.hawtdb.internal.util.Ranges; +import org.apache.hawtdb.internal.util.Ranges.Range; +import org.junit.Test; + +/** + * + * @author Hiram Chirino + */ +public class RangesTest { + + @Test + public void test() { + + Ranges ranges = new Ranges(); + + // Example of a simple range merges.. + ranges.add(0, 5); + ranges.add(15, 5); + ranges.add(5,10); + assertEquals(ranges(range(0,20)), ranges.toArrayList()); + + // Remove which splits an existing range into 2. + ranges.remove(5,10); + assertEquals(ranges(range(0,5),range(15,20)), ranges.toArrayList()); + + // overlapping add... + ranges.add(4,12); + assertEquals(ranges(range(0,20)), ranges.toArrayList()); + + // Removes are idempotent + ranges.remove(5,10); + assertEquals(ranges(range(0,5),range(15,20)), ranges.toArrayList()); + ranges.remove(5,10); + assertEquals(ranges(range(0,5),range(15,20)), ranges.toArrayList()); + + // Adds are idempotent + ranges.add(5,10); + assertEquals(ranges(range(0,20)), ranges.toArrayList()); + ranges.add(5,10); + assertEquals(ranges(range(0,20)), ranges.toArrayList()); + } + + ArrayList ranges(Range... args) { + ArrayList rc = new ArrayList(); + for (Range range : args) { + rc.add(range); + } + return rc; + } + +} Added: activemq/sandbox/activemq-flow/hawtdb/src/test/resources/log4j.properties URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/hawtdb/src/test/resources/log4j.properties?rev=825564&view=auto ============================================================================== --- activemq/sandbox/activemq-flow/hawtdb/src/test/resources/log4j.properties (added) +++ activemq/sandbox/activemq-flow/hawtdb/src/test/resources/log4j.properties Thu Oct 15 17:04:11 2009 @@ -0,0 +1,27 @@ +## --------------------------------------------------------------------------- +## Licensed to the Apache Software Foundation (ASF) under one or more +## contributor license agreements. See the NOTICE file distributed with +## this work for additional information regarding copyright ownership. +## The ASF licenses this file to You under the Apache License, Version 2.0 +## (the "License"); you may not use this file except in compliance with +## the License. You may obtain a copy of the License at +## +## http://www.apache.org/licenses/LICENSE-2.0 +## +## Unless required by applicable law or agreed to in writing, software +## distributed under the License is distributed on an "AS IS" BASIS, +## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +## See the License for the specific language governing permissions and +## limitations under the License. +## --------------------------------------------------------------------------- + +# +# The logging properties used during tests.. +# +log4j.rootLogger=WARN, console +log4j.logger.org.apache.hawtdb=INFO + +# CONSOLE appender not used by default +log4j.appender.console=org.apache.log4j.ConsoleAppender +log4j.appender.console.layout=org.apache.log4j.PatternLayout +log4j.appender.console.layout.ConversionPattern=%d [%-15.15t] %-5p %-30.30c{1} - %m%n Modified: activemq/sandbox/activemq-flow/pom.xml URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/pom.xml?rev=825564&r1=825563&r2=825564&view=diff ============================================================================== --- activemq/sandbox/activemq-flow/pom.xml (original) +++ activemq/sandbox/activemq-flow/pom.xml Thu Oct 15 17:04:11 2009 @@ -127,6 +127,7 @@ activemq-store activemq-transport activemq-util + hawtdb kahadb activemq-protobuf activemq-client @@ -843,6 +844,7 @@ + com.sun.tools.jxc.maven2 maven-jaxb-schemagen-plugin