Return-Path: X-Original-To: apmail-incubator-connectors-commits-archive@minotaur.apache.org Delivered-To: apmail-incubator-connectors-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 6EA2C9507 for ; Sun, 18 Dec 2011 08:27:25 +0000 (UTC) Received: (qmail 51718 invoked by uid 500); 18 Dec 2011 08:27:23 -0000 Delivered-To: apmail-incubator-connectors-commits-archive@incubator.apache.org Received: (qmail 51671 invoked by uid 500); 18 Dec 2011 08:27:22 -0000 Mailing-List: contact connectors-commits-help@incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: connectors-dev@incubator.apache.org Delivered-To: mailing list connectors-commits@incubator.apache.org Received: (qmail 51663 invoked by uid 99); 18 Dec 2011 08:27:22 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 18 Dec 2011 08:27:22 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 18 Dec 2011 08:27:03 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 02DDB2388A9B; Sun, 18 Dec 2011 08:26:40 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1220349 [5/6] - in /incubator/lcf/branches/CONNECTORS-286/warthog-reimport: ./ lib/ src/ src/main/ src/main/java/ src/main/java/org/ src/main/java/org/apache/ src/main/java/org/apache/warthog/ src/main/java/org/apache/warthog/api/ src/main... Date: Sun, 18 Dec 2011 08:26:34 -0000 To: connectors-commits@incubator.apache.org From: kwright@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20111218082640.02DDB2388A9B@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Added: incubator/lcf/branches/CONNECTORS-286/warthog-reimport/src/main/java/org/apache/warthog/transactionalkeyvaluestore/WHKeyValue.java URL: http://svn.apache.org/viewvc/incubator/lcf/branches/CONNECTORS-286/warthog-reimport/src/main/java/org/apache/warthog/transactionalkeyvaluestore/WHKeyValue.java?rev=1220349&view=auto ============================================================================== --- incubator/lcf/branches/CONNECTORS-286/warthog-reimport/src/main/java/org/apache/warthog/transactionalkeyvaluestore/WHKeyValue.java (added) +++ incubator/lcf/branches/CONNECTORS-286/warthog-reimport/src/main/java/org/apache/warthog/transactionalkeyvaluestore/WHKeyValue.java Sun Dec 18 08:26:30 2011 @@ -0,0 +1,33 @@ +/* $Id: WHKey.java 1205831 2011-11-24 13:57:15Z kwright $ */ + +/** +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.warthog.transactionalkeyvaluestore; + +/** This object represents a value in a transactional key/value store. +* Pretty much any (serializable) object can be used, provided there +* is an available hash function and equals method. +*/ +public interface WHKeyValue +{ + public byte[] serializeObject(); + + /** Check if equals (classes must already agree) */ + public boolean isEquals(WHKeyValue value); + +} Added: incubator/lcf/branches/CONNECTORS-286/warthog-reimport/src/main/java/org/apache/warthog/transactionalkeyvaluestore/WHTransaction.java URL: http://svn.apache.org/viewvc/incubator/lcf/branches/CONNECTORS-286/warthog-reimport/src/main/java/org/apache/warthog/transactionalkeyvaluestore/WHTransaction.java?rev=1220349&view=auto ============================================================================== --- incubator/lcf/branches/CONNECTORS-286/warthog-reimport/src/main/java/org/apache/warthog/transactionalkeyvaluestore/WHTransaction.java (added) +++ incubator/lcf/branches/CONNECTORS-286/warthog-reimport/src/main/java/org/apache/warthog/transactionalkeyvaluestore/WHTransaction.java Sun Dec 18 08:26:30 2011 @@ -0,0 +1,55 @@ +/* $Id: WHTransaction.java 1204399 2011-11-21 08:44:53Z kwright $ */ + +/** +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.warthog.transactionalkeyvaluestore; + +import org.apache.warthog.api.*; + +/** This interface represents a transaction that is in progress. +*/ +public interface WHTransaction +{ + /** Set a value. May be null. */ + public void put(WHKey key, WHKeyValue value) + throws WHException; + + /** Get a value. Null returned if no value. */ + public WHKeyValue get(WHKey key) + throws WHException; + + /** Check to see if this transaction has become inconsistent. + * If so, a WHDeadlockException is thrown. + */ + public void check() + throws WHException; + + /** Commit this transaction. If the transaction is inconsistent, + * a WHDeadlockException will be thrown. */ + public void commit() + throws WHException; + + /** Abandon this transaction. + * This is called as a nicety to free any resources associated with the + * transaction. The implementation should also be robust as far as + * freeing resources if this method is NOT called, but might perform + * the necessary logic in a finalizer at an arbitrary time. + */ + public void abandon(); + +} Added: incubator/lcf/branches/CONNECTORS-286/warthog-reimport/src/main/java/org/apache/warthog/transactionalkeyvaluestore/WHTransactionalStore.java URL: http://svn.apache.org/viewvc/incubator/lcf/branches/CONNECTORS-286/warthog-reimport/src/main/java/org/apache/warthog/transactionalkeyvaluestore/WHTransactionalStore.java?rev=1220349&view=auto ============================================================================== --- incubator/lcf/branches/CONNECTORS-286/warthog-reimport/src/main/java/org/apache/warthog/transactionalkeyvaluestore/WHTransactionalStore.java (added) +++ incubator/lcf/branches/CONNECTORS-286/warthog-reimport/src/main/java/org/apache/warthog/transactionalkeyvaluestore/WHTransactionalStore.java Sun Dec 18 08:26:30 2011 @@ -0,0 +1,32 @@ +/* $Id: WHTransactionalStore.java 1199794 2011-11-09 15:28:08Z kwright $ */ + +/** +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.warthog.transactionalkeyvaluestore; + +import org.apache.warthog.api.*; + +/** Interface describing a transactional key/value store. +*/ +public interface WHTransactionalStore +{ + /** Begin a transaction. */ + public WHTransaction createTransaction() + throws WHException; + +} Added: incubator/lcf/branches/CONNECTORS-286/warthog-reimport/src/test/java/org/apache/warthog/tests/InMemAtomicByteKeyValueStore.java URL: http://svn.apache.org/viewvc/incubator/lcf/branches/CONNECTORS-286/warthog-reimport/src/test/java/org/apache/warthog/tests/InMemAtomicByteKeyValueStore.java?rev=1220349&view=auto ============================================================================== --- incubator/lcf/branches/CONNECTORS-286/warthog-reimport/src/test/java/org/apache/warthog/tests/InMemAtomicByteKeyValueStore.java (added) +++ incubator/lcf/branches/CONNECTORS-286/warthog-reimport/src/test/java/org/apache/warthog/tests/InMemAtomicByteKeyValueStore.java Sun Dec 18 08:26:30 2011 @@ -0,0 +1,110 @@ +/* $Id: InMemAtomicByteKeyValueStore.java 1206939 2011-11-28 00:37:54Z kwright $ */ + +/** +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.warthog.tests; + +import org.apache.warthog.api.*; +import org.apache.warthog.bytekeyvalue.*; +import java.util.*; + +/** In-memory key value store, for testing. +*/ +public class InMemAtomicByteKeyValueStore implements WHAtomicByteKeyValueStore +{ + protected Map database; + + /** Constructor */ + public InMemAtomicByteKeyValueStore(int initialSize) + { + database = new HashMap(initialSize); + } + + /** Get a value */ + public byte[] get(byte[] key) + throws WHException + { + synchronized (database) + { + return database.get(new WrappedByteKey(key)); + } + } + + /** Check a bunch of values atomically for consistency. + *@param checkValues is a map of keys/values that must be unchanged. If any value is + * changed, a WHDeadlockException is thrown. + */ + public void check(WHByteKeyMap checkValues) + throws WHException + { + synchronized (database) + { + WHByteKeyIterator iterator = checkValues.iterator(); + while (iterator.hasNext()) + { + byte[] key = iterator.next(); + WrappedByteKey wrappedKey = new WrappedByteKey(key); + byte[] value = database.get(wrappedKey); + byte[] otherValue = checkValues.get(key); + if (value == null && otherValue == null) + continue; + if (value != null && otherValue != null) + { + if (value.length != otherValue.length) + throw new WHConcurrencyException(); + for (int i = 0 ; i < value.length ; i++) + { + if (value[i] != otherValue[i]) + throw new WHConcurrencyException(); + } + continue; + } + //System.out.println("Type of key in contention = "+key.getClass().getName()); + throw new WHConcurrencyException(); + } + } + } + + /** Set a bunch of values atomically. + *@param checkValues is a map of keys/values that must be unchanged in order for the + * commit to proceed. If these values are detected to have been changed, a WHDeadlockException + * will be thrown. Null values are permitted. + *@param setValues is a map of keys to set to specified new values. A null value implies removal of + * the key. + */ + public void setAll(WHByteKeyMap checkValues, WHByteKeyMap setValues) + throws WHException + { + synchronized (database) + { + check(checkValues); + WHByteKeyIterator iterator = setValues.iterator(); + while (iterator.hasNext()) + { + byte[] key = iterator.next(); + WrappedByteKey wrappedKey = new WrappedByteKey(key); + byte[] value = setValues.get(key); + if (value == null) + database.remove(wrappedKey); + else + database.put(wrappedKey,value); + } + } + } + +} Added: incubator/lcf/branches/CONNECTORS-286/warthog-reimport/src/test/java/org/apache/warthog/tests/InMemAtomicKeyValueStore.java URL: http://svn.apache.org/viewvc/incubator/lcf/branches/CONNECTORS-286/warthog-reimport/src/test/java/org/apache/warthog/tests/InMemAtomicKeyValueStore.java?rev=1220349&view=auto ============================================================================== --- incubator/lcf/branches/CONNECTORS-286/warthog-reimport/src/test/java/org/apache/warthog/tests/InMemAtomicKeyValueStore.java (added) +++ incubator/lcf/branches/CONNECTORS-286/warthog-reimport/src/test/java/org/apache/warthog/tests/InMemAtomicKeyValueStore.java Sun Dec 18 08:26:30 2011 @@ -0,0 +1,102 @@ +/* $Id: InMemAtomicKeyValueStore.java 1206939 2011-11-28 00:37:54Z kwright $ */ + +/** +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.warthog.tests; + +import org.apache.warthog.api.*; +import org.apache.warthog.keyvalue.*; +import org.apache.warthog.transactionalkeyvaluestore.*; +import java.util.*; + +/** In-memory key value store, for testing. +*/ +public class InMemAtomicKeyValueStore implements WHAtomicKeyValueStore +{ + protected Map database; + + /** Constructor */ + public InMemAtomicKeyValueStore(int initialSize) + { + database = new HashMap(initialSize); + } + + /** Get a value */ + public WHKeyValue get(WHKey key) + throws WHException + { + synchronized (database) + { + return database.get(new WrappedKey(key)); + } + } + + /** Check a bunch of values atomically for consistency. + *@param checkValues is a map of keys/values that must be unchanged. If any value is + * changed, a WHDeadlockException is thrown. + */ + public void check(WHKeyMap checkValues) + throws WHException + { + synchronized (database) + { + WHKeyIterator iterator = checkValues.iterator(); + while (iterator.hasNext()) + { + WHKey key = iterator.next(); + WrappedKey wrappedKey = new WrappedKey(key); + WHKeyValue value = database.get(wrappedKey); + WHKeyValue otherValue = checkValues.get(key); + if (value == null && otherValue == null) + continue; + if (value != null && otherValue != null && value.isEquals(otherValue)) + continue; + //System.out.println("Type of key in contention = "+key.getClass().getName()); + throw new WHConcurrencyException(); + } + } + } + + /** Set a bunch of values atomically. + *@param checkValues is a map of keys/values that must be unchanged in order for the + * commit to proceed. If these values are detected to have been changed, a WHDeadlockException + * will be thrown. Null values are permitted. + *@param setValues is a map of keys to set to specified new values. A null value implies removal of + * the key. + */ + public void setAll(WHKeyMap checkValues, WHKeyMap setValues) + throws WHException + { + synchronized (database) + { + check(checkValues); + WHKeyIterator iterator = setValues.iterator(); + while (iterator.hasNext()) + { + WHKey key = iterator.next(); + WrappedKey wrappedKey = new WrappedKey(key); + WHKeyValue value = setValues.get(key); + if (value == null) + database.remove(wrappedKey); + else + database.put(wrappedKey,value); + } + } + } + +} Added: incubator/lcf/branches/CONNECTORS-286/warthog-reimport/src/test/java/org/apache/warthog/tests/InMemAtomicNativeKeyValueStore.java URL: http://svn.apache.org/viewvc/incubator/lcf/branches/CONNECTORS-286/warthog-reimport/src/test/java/org/apache/warthog/tests/InMemAtomicNativeKeyValueStore.java?rev=1220349&view=auto ============================================================================== --- incubator/lcf/branches/CONNECTORS-286/warthog-reimport/src/test/java/org/apache/warthog/tests/InMemAtomicNativeKeyValueStore.java (added) +++ incubator/lcf/branches/CONNECTORS-286/warthog-reimport/src/test/java/org/apache/warthog/tests/InMemAtomicNativeKeyValueStore.java Sun Dec 18 08:26:30 2011 @@ -0,0 +1,100 @@ +/* $Id: InMemAtomicNativeKeyValueStore.java 1208116 2011-11-29 22:36:26Z kwright $ */ + +/** +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.warthog.tests; + +import org.apache.warthog.api.*; +import org.apache.warthog.keyvalue.*; +import org.apache.warthog.transactionalkeyvaluestore.*; +import java.util.*; + +/** In-memory key value store, native hash, for testing. +*/ +public class InMemAtomicNativeKeyValueStore implements WHAtomicKeyValueStore +{ + protected Map database; + + /** Constructor */ + public InMemAtomicNativeKeyValueStore(int initialSize) + { + database = new HashMap(initialSize); + } + + /** Get a value */ + public WHKeyValue get(WHKey key) + throws WHException + { + synchronized (database) + { + return database.get(key); + } + } + + /** Check a bunch of values atomically for consistency. + *@param checkValues is a map of keys/values that must be unchanged. If any value is + * changed, a WHDeadlockException is thrown. + */ + public void check(WHKeyMap checkValues) + throws WHException + { + synchronized (database) + { + WHKeyIterator iterator = checkValues.iterator(); + while (iterator.hasNext()) + { + WHKey key = iterator.next(); + WHKeyValue value = database.get(key); + WHKeyValue otherValue = checkValues.get(key); + if (value == null && otherValue == null) + continue; + if (value != null && otherValue != null && value.equals(otherValue)) + continue; + //System.out.println("Type of key in contention = "+key.getClass().getName()); + throw new WHConcurrencyException(); + } + } + } + + /** Set a bunch of values atomically. + *@param checkValues is a map of keys/values that must be unchanged in order for the + * commit to proceed. If these values are detected to have been changed, a WHDeadlockException + * will be thrown. Null values are permitted. + *@param setValues is a map of keys to set to specified new values. A null value implies removal of + * the key. + */ + public void setAll(WHKeyMap checkValues, WHKeyMap setValues) + throws WHException + { + synchronized (database) + { + check(checkValues); + WHKeyIterator iterator = setValues.iterator(); + while (iterator.hasNext()) + { + WHKey key = iterator.next(); + WHKeyValue value = setValues.get(key); + if (value == null) + database.remove(key); + else + database.put(key,value); + } + } + } + +} Added: incubator/lcf/branches/CONNECTORS-286/warthog-reimport/src/test/java/org/apache/warthog/tests/InMemAtomicNativeNonblockingKeyValueStore.java URL: http://svn.apache.org/viewvc/incubator/lcf/branches/CONNECTORS-286/warthog-reimport/src/test/java/org/apache/warthog/tests/InMemAtomicNativeNonblockingKeyValueStore.java?rev=1220349&view=auto ============================================================================== --- incubator/lcf/branches/CONNECTORS-286/warthog-reimport/src/test/java/org/apache/warthog/tests/InMemAtomicNativeNonblockingKeyValueStore.java (added) +++ incubator/lcf/branches/CONNECTORS-286/warthog-reimport/src/test/java/org/apache/warthog/tests/InMemAtomicNativeNonblockingKeyValueStore.java Sun Dec 18 08:26:30 2011 @@ -0,0 +1,328 @@ +/* $Id: InMemAtomicNativeNonblockingKeyValueStore.java 1210934 2011-12-06 14:33:45Z kwright $ */ + +/** +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.warthog.tests; + +import org.apache.warthog.api.*; +import org.apache.warthog.keyvalue.*; +import org.apache.warthog.transactionalkeyvaluestore.*; +import java.util.*; + +/** In-memory key value store, native hash, for testing. +*/ +public class InMemAtomicNativeNonblockingKeyValueStore implements WHAtomicKeyValueStore +{ + protected static int databaseCount = 1024; + protected static int databaseMask = databaseCount - 1; + + protected Map[] databaseArray; + + /** Constructor */ + public InMemAtomicNativeNonblockingKeyValueStore(int initialSize) + { + initialSize /= databaseCount; + databaseArray = new Map[databaseCount]; + for (int i = 0; i < databaseCount ; i++) + { + databaseArray[i] = new HashMap(initialSize); + } + } + + /** Get a value */ + public WHKeyValue get(WHKey key) + throws WHException + { + Map database = databaseArray[key.hashCode() & databaseMask]; + synchronized (database) + { + return (WHKeyValue)database.get(key); + } + } + + /** Check a bunch of values atomically for consistency. + *@param checkValues is a map of keys/values that must be unchanged. If any value is + * changed, a WHDeadlockException is thrown. + */ + public void check(WHKeyMap checkValues) + throws WHException + { + WHKeyIterator iterator = checkValues.iterator(); + while (iterator.hasNext()) + { + WHKey key = iterator.next(); + WHKeyValue value = get(key); + WHKeyValue otherValue = checkValues.get(key); + if (value == null && otherValue == null) + continue; + if (value != null && otherValue != null && value.equals(otherValue)) + continue; + //System.out.println("Type of key in contention = "+key.getClass().getName()); + throw new WHConcurrencyException(); + } + } + + /** Set a bunch of values atomically. + *@param checkValues is a map of keys/values that must be unchanged in order for the + * commit to proceed. If these values are detected to have been changed, a WHDeadlockException + * will be thrown. Null values are permitted. + *@param setValues is a map of keys to set to specified new values. A null value implies removal of + * the key. + */ + public void setAll(WHKeyMap checkValues, WHKeyMap setValues) + throws WHException + { + // First, get the identifier of the thread making the request + Long threadID = new Long(Thread.currentThread().getId()); + // Allocate a lock vector, to be filled in as we assert the read locks + LockKey[] locks = new LockKey[(int)checkValues.size()]; + int lockPointer = 0; + // Allocate a write lock vector, same deal + LockKey[] writeLocks = new LockKey[(int)setValues.size()]; + int writeLockPointer = 0; + try + { + WHKeyIterator iterator = checkValues.iterator(); + while (iterator.hasNext()) + { + WHKey key = iterator.next(); + LockKey lockKey = new LockKey(key); + grabLock(lockKey,threadID); + locks[lockPointer++] = lockKey; + WHKeyValue value = get(key); + WHKeyValue otherValue = checkValues.get(key); + if (value == null && otherValue == null) + continue; + if (value != null && otherValue != null && value.equals(otherValue)) + continue; + //System.out.println("Type of key in check contention = "+key.getClass().getName()); + throw new WHConcurrencyException(); + } + // We got all the read locks and passed the checks! Get the write locks now. These represent + // intent to change... + iterator = setValues.iterator(); + while (iterator.hasNext()) + { + WHKey key = iterator.next(); + LockKey writeLockKey = new LockKey(key); + grabWriteLock(writeLockKey,threadID); + writeLocks[writeLockPointer++] = writeLockKey; + } + // Finally, do the commit + int releaseLockPointer = 0; + iterator = setValues.iterator(); + while (iterator.hasNext()) + { + WHKey key = iterator.next(); + WHKeyValue value = setValues.get(key); + if (value == null) + remove(key); + else + put(key,value); + // We can now release the write lock for this key + LockKey lockKey = writeLocks[releaseLockPointer]; + releaseWriteLock(lockKey,threadID); + writeLocks[releaseLockPointer++] = null; + } + } + finally + { + // Undo write locks so far set + while (writeLockPointer > 0) + { + LockKey lockKey = writeLocks[--writeLockPointer]; + if (lockKey != null) + releaseWriteLock(lockKey,threadID); + } + // Undo locks so far set + while (lockPointer > 0) + { + LockKey lockKey = locks[--lockPointer]; + releaseLock(lockKey,threadID); + } + } + } + + // Basic operations + + protected void put(Object key, Object value) + { + Map database = databaseArray[key.hashCode() & databaseMask]; + synchronized (database) + { + database.put(key,value); + } + } + + protected void remove(Object key) + { + Map database = databaseArray[key.hashCode() & databaseMask]; + synchronized (database) + { + database.remove(key); + } + } + + protected void grabLock(LockKey key, Long threadID) + throws WHConcurrencyException + { + try + { + Map database = databaseArray[key.hashCode() & databaseMask]; + synchronized (database) + { + LockValue lockValue = (LockValue)database.get(key); + if (lockValue == null) + { + lockValue = new LockValue(); + database.put(key,lockValue); + } + lockValue.addReadLock(threadID); + } + } + catch (WHConcurrencyException e) + { + //System.out.println("Read lock concurrency: "+key.getKey().getClass().getName()); + throw e; + } + } + + protected void grabWriteLock(LockKey key, Long threadID) + throws WHConcurrencyException + { + try + { + Map database = databaseArray[key.hashCode() & databaseMask]; + synchronized (database) + { + LockValue lockValue = (LockValue)database.get(key); + if (lockValue == null) + { + lockValue = new LockValue(); + database.put(key,lockValue); + } + lockValue.addWriteLock(threadID); + } + } + catch (WHConcurrencyException e) + { + //System.out.println("Write lock concurrency: "+key.getKey().getClass().getName()); + throw e; + } + } + + protected void releaseLock(LockKey key, Long threadID) + { + Map database = databaseArray[key.hashCode() & databaseMask]; + synchronized (database) + { + LockValue lockValue = (LockValue)database.get(key); + if (lockValue.removeReadLock(threadID)) + database.remove(key); + } + } + + protected void releaseWriteLock(LockKey key, Long threadID) + { + Map database = databaseArray[key.hashCode() & databaseMask]; + synchronized (database) + { + LockValue lockValue = (LockValue)database.get(key); + if (lockValue.removeWriteLock(threadID)) + database.remove(key); + } + } + + // Lock class + + protected static class LockKey + { + protected WHKey key; + + public LockKey(WHKey key) + { + this.key = key; + } + + public int hashCode() + { + return key.hashCode() + 1532; + } + + public boolean equals(Object o) + { + if (o.getClass() != getClass()) + return false; + return ((LockKey)o).key.equals(key); + } + + public WHKey getKey() + { + return key; + } + } + + // Lock value class + + protected static class LockValue + { + protected boolean writeLock = false; + protected boolean promotedReadLock = false; + protected Set threadIDs = new HashSet(); + + public LockValue() + { + } + + public void addReadLock(Long threadID) + throws WHConcurrencyException + { + if (writeLock) + throw new WHConcurrencyException(); + threadIDs.add(threadID); + } + + public boolean removeReadLock(Long threadID) + { + threadIDs.remove(threadID); + return threadIDs.size() == 0; + } + + public void addWriteLock(Long threadID) + throws WHConcurrencyException + { + if (writeLock || threadIDs.size() > 1 || (threadIDs.size() == 1 && !threadIDs.contains(threadID))) + throw new WHConcurrencyException(); + writeLock = true; + if (threadIDs.contains(threadID)) + promotedReadLock = true; + else + threadIDs.add(threadID); + } + + public boolean removeWriteLock(Long threadID) + { + writeLock = false; + if (promotedReadLock) + promotedReadLock = false; + else + threadIDs.remove(threadID); + return threadIDs.size() == 0; + } + } +} Added: incubator/lcf/branches/CONNECTORS-286/warthog-reimport/src/test/java/org/apache/warthog/tests/InMemByteTransactionImpl.java URL: http://svn.apache.org/viewvc/incubator/lcf/branches/CONNECTORS-286/warthog-reimport/src/test/java/org/apache/warthog/tests/InMemByteTransactionImpl.java?rev=1220349&view=auto ============================================================================== --- incubator/lcf/branches/CONNECTORS-286/warthog-reimport/src/test/java/org/apache/warthog/tests/InMemByteTransactionImpl.java (added) +++ incubator/lcf/branches/CONNECTORS-286/warthog-reimport/src/test/java/org/apache/warthog/tests/InMemByteTransactionImpl.java Sun Dec 18 08:26:30 2011 @@ -0,0 +1,242 @@ +/* $Id: InMemByteTransactionImpl.java 1208500 2011-11-30 16:38:45Z kwright $ */ + +/** +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.warthog.tests; + +import org.apache.warthog.api.*; +import org.apache.warthog.bytekeyvalue.*; +import org.apache.warthog.transactionalkeyvaluestore.*; +import java.util.*; + +/** This class implements a byte-based transaction that is in progress, completely in memory. +*/ +public class InMemByteTransactionImpl implements WHTransaction +{ + protected WHAtomicByteKeyValueStore underlyingStore; + + protected KeyMap checkData = new KeyMap(); + protected KeyMap localData = null; + + /** Constructor */ + public InMemByteTransactionImpl(WHAtomicByteKeyValueStore underlyingStore) + { + this.underlyingStore = underlyingStore; + } + + /** Set a value. May be null. */ + public void put(WHKey key, WHKeyValue value) + throws WHException + { + if (localData == null) + localData = new KeyMap(); + localData.put(dematerialize(key),dematerialize(value)); + } + + /** Get a value. Null returned if no value. */ + public WHKeyValue get(WHKey key) + throws WHException + { + byte[] byteKey = dematerialize(key); + if (localData != null) + { + if (localData.containsKey(byteKey)) + return materialize(localData.get(byteKey)); + } + if (checkData.containsKey(byteKey)) + return materialize(checkData.get(byteKey)); + byte[] value = underlyingStore.get(byteKey); + checkData.put(byteKey,value); + return materialize(value); + } + + /** Check to see if this transaction has become inconsistent. + * If so, a WHDeadlockException is thrown. + */ + public void check() + throws WHException + { + underlyingStore.check(checkData); + } + + /** Commit this transaction */ + public void commit() + throws WHException + { + if (localData != null) + underlyingStore.setAll(checkData,localData); + else + underlyingStore.check(checkData); + } + + /** Abandon this transaction. + * This is called as a nicety to free any resources associated with the + * transaction. The implementation should also be robust as far as + * freeing resources if this method is NOT called, but might perform + * the necessary logic in a finalizer at an arbitrary time. + */ + public void abandon() + { + // Does nothing so long as the entire temporary transaction is in + // memory. + } + + protected byte[] dematerialize(WHKeyValue value) + { + if (value == null) + return null; + try + { + byte[] classNameBytes = value.getClass().getName().getBytes("ASCII"); + byte[] valueBytes = value.serializeObject(); + byte[] rval = new byte[1+classNameBytes.length+valueBytes.length]; + rval[0] = (byte)classNameBytes.length; + // Copy arrays around + System.arraycopy(classNameBytes,0,rval,1,classNameBytes.length); + System.arraycopy(valueBytes,0,rval,1+classNameBytes.length,valueBytes.length); + return rval; + } + catch (java.io.UnsupportedEncodingException e) + { + throw new RuntimeException("Couldn't locate ASCII encoder"); + } + } + + protected WHKeyValue materialize(byte[] bytes) + throws WHException + { + if (bytes == null) + return null; + try + { + int classNameLength = (int)bytes[0]; + String className = new String(bytes,1,classNameLength,"ASCII"); + byte[] serializedValue = new byte[bytes.length - (classNameLength+1)]; + // Copy bytes around + System.arraycopy(bytes,classNameLength+1,serializedValue,0,serializedValue.length); + try + { + Class classToMaterialize = Class.forName(className); + java.lang.reflect.Constructor constructor = classToMaterialize.getConstructor(new Class[]{byte[].class}); + Object o = constructor.newInstance(new Object[]{serializedValue}); + if (!(o instanceof WHKeyValue)) + throw new WHException("Class '"+className+"' is not an instance of WHValue"); + return (WHKeyValue)o; + } + catch (ClassNotFoundException e) + { + throw new WHException("Class '"+className+"' cannot be found"); + } + catch (java.lang.reflect.InvocationTargetException e) + { + throw new WHException("Constructor exception instantiating class '"+className+"': "+e.getMessage(),e); + } + catch (InstantiationException e) + { + throw new WHException("Instantiation exception for class '"+className+"': "+e.getMessage(),e); + } + catch (NoSuchMethodException e) + { + throw new WHException("Class '"+className+"' does not include a constructor with byte[] argument"); + } + catch (IllegalAccessException e) + { + throw new WHException("Class '"+className+"' byte[] constructor has protected access"); + } + } + catch (java.io.UnsupportedEncodingException e) + { + throw new RuntimeException("Couldn't locate ASCII encoder"); + } + } + + /** Local implementation of byte[] map */ + protected static class KeyMap implements WHByteKeyMap + { + protected Map map = new HashMap(); + + public KeyMap() + { + } + + /** Get a value from the map. + */ + public byte[] get(byte[] key) + throws WHException + { + return map.get(new WrappedByteKey(key)); + } + + /** Iterate over the keys in the map. + */ + public WHByteKeyIterator iterator() + throws WHException + { + return new KeyIterator(map.keySet().iterator()); + } + + /** Get the size of the map. + */ + public long size() + throws WHException + { + return (long)map.size(); + } + + /** Check if the map contains the specified key. + */ + public boolean containsKey(byte[] key) + { + return map.containsKey(new WrappedByteKey(key)); + } + + /** Put a value. + */ + public void put(byte[] key, byte[] value) + { + map.put(new WrappedByteKey(key),value); + } + } + + /** Key iterator for KeyMap */ + protected static class KeyIterator implements WHByteKeyIterator + { + protected Iterator iterator; + + public KeyIterator(Iterator iterator) + { + this.iterator = iterator; + } + + /** Check if there is another value */ + public boolean hasNext() + throws WHException + { + return iterator.hasNext(); + } + + /** Get the next value */ + public byte[] next() + throws WHException + { + return iterator.next().getKey(); + } + + } + +} Added: incubator/lcf/branches/CONNECTORS-286/warthog-reimport/src/test/java/org/apache/warthog/tests/InMemByteTransactionalStoreImpl.java URL: http://svn.apache.org/viewvc/incubator/lcf/branches/CONNECTORS-286/warthog-reimport/src/test/java/org/apache/warthog/tests/InMemByteTransactionalStoreImpl.java?rev=1220349&view=auto ============================================================================== --- incubator/lcf/branches/CONNECTORS-286/warthog-reimport/src/test/java/org/apache/warthog/tests/InMemByteTransactionalStoreImpl.java (added) +++ incubator/lcf/branches/CONNECTORS-286/warthog-reimport/src/test/java/org/apache/warthog/tests/InMemByteTransactionalStoreImpl.java Sun Dec 18 08:26:30 2011 @@ -0,0 +1,46 @@ +/* $Id: InMemByteTransactionalStoreImpl.java 1206939 2011-11-28 00:37:54Z kwright $ */ + +/** +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.warthog.tests; + +import org.apache.warthog.api.*; +import org.apache.warthog.bytekeyvalue.*; +import org.apache.warthog.transactionalkeyvaluestore.*; + +/** Class implementing a transactional byte-based key/value store completely in memory. +*/ +public class InMemByteTransactionalStoreImpl implements WHTransactionalStore +{ + /** Underlying atomic key/value store */ + protected WHAtomicByteKeyValueStore underlyingStore; + + /** Constructor. */ + public InMemByteTransactionalStoreImpl(WHAtomicByteKeyValueStore underlyingStore) + { + this.underlyingStore = underlyingStore; + } + + /** Create a transaction. */ + public WHTransaction createTransaction() + throws WHException + { + return new InMemByteTransactionImpl(this.underlyingStore); + } + +} Added: incubator/lcf/branches/CONNECTORS-286/warthog-reimport/src/test/java/org/apache/warthog/tests/InMemNativeTransactionImpl.java URL: http://svn.apache.org/viewvc/incubator/lcf/branches/CONNECTORS-286/warthog-reimport/src/test/java/org/apache/warthog/tests/InMemNativeTransactionImpl.java?rev=1220349&view=auto ============================================================================== --- incubator/lcf/branches/CONNECTORS-286/warthog-reimport/src/test/java/org/apache/warthog/tests/InMemNativeTransactionImpl.java (added) +++ incubator/lcf/branches/CONNECTORS-286/warthog-reimport/src/test/java/org/apache/warthog/tests/InMemNativeTransactionImpl.java Sun Dec 18 08:26:30 2011 @@ -0,0 +1,173 @@ +/* $Id: InMemNativeTransactionImpl.java 1208500 2011-11-30 16:38:45Z kwright $ */ + +/** +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.warthog.tests; + +import org.apache.warthog.api.*; +import org.apache.warthog.keyvalue.*; +import org.apache.warthog.transactionalkeyvaluestore.*; +import java.util.*; + +/** This class implements a transaction that is in progress, completely in memory, +* using native hash methods. +*/ +public class InMemNativeTransactionImpl implements WHTransaction +{ + protected WHAtomicKeyValueStore underlyingStore; + + protected KeyMap checkData = new KeyMap(); + protected KeyMap localData = null; + + /** Constructor */ + public InMemNativeTransactionImpl(WHAtomicKeyValueStore underlyingStore) + { + this.underlyingStore = underlyingStore; + } + + /** Set a value. May be null. */ + public void put(WHKey key, WHKeyValue value) + throws WHException + { + if (localData == null) + localData = new KeyMap(); + localData.put(key,value); + } + + /** Get a value. Null returned if no value. */ + public WHKeyValue get(WHKey key) + throws WHException + { + if (localData != null) + { + if (localData.containsKey(key)) + return localData.get(key); + } + if (checkData.containsKey(key)) + return checkData.get(key); + WHKeyValue value = underlyingStore.get(key); + checkData.put(key,value); + return value; + } + + /** Check to see if this transaction has become inconsistent. + * If so, a WHDeadlockException is thrown. + */ + public void check() + throws WHException + { + underlyingStore.check(checkData); + } + + /** Commit this transaction */ + public void commit() + throws WHException + { + if (localData != null) + underlyingStore.setAll(checkData,localData); + else + underlyingStore.check(checkData); + } + + /** Abandon this transaction. + * This is called as a nicety to free any resources associated with the + * transaction. The implementation should also be robust as far as + * freeing resources if this method is NOT called, but might perform + * the necessary logic in a finalizer at an arbitrary time. + */ + public void abandon() + { + // Does nothing so long as the entire temporary transaction is in + // memory. + } + + /** Local implementation of WHKey/WHValue map */ + protected static class KeyMap implements WHKeyMap + { + protected Map map = new HashMap(); + + public KeyMap() + { + } + + /** Get a value from the map. + */ + public WHKeyValue get(WHKey key) + throws WHException + { + return map.get(key); + } + + /** Iterate over the keys in the map. + */ + public WHKeyIterator iterator() + throws WHException + { + return new KeyIterator(map.keySet().iterator()); + } + + /** Get the size of the map. + */ + public long size() + throws WHException + { + return (long)map.size(); + } + + /** Check if the map contains the specified key. + */ + public boolean containsKey(WHKey key) + { + return map.containsKey(key); + } + + /** Put a value. + */ + public void put(WHKey key, WHKeyValue value) + { + map.put(key,value); + } + } + + /** Key iterator for KeyMap */ + protected static class KeyIterator implements WHKeyIterator + { + protected Iterator iterator; + + public KeyIterator(Iterator iterator) + { + this.iterator = iterator; + } + + /** Check if there is another value */ + public boolean hasNext() + throws WHException + { + return iterator.hasNext(); + } + + /** Get the next value */ + public WHKey next() + throws WHException + { + return iterator.next(); + } + + } + +} Added: incubator/lcf/branches/CONNECTORS-286/warthog-reimport/src/test/java/org/apache/warthog/tests/InMemNativeTransactionalStoreImpl.java URL: http://svn.apache.org/viewvc/incubator/lcf/branches/CONNECTORS-286/warthog-reimport/src/test/java/org/apache/warthog/tests/InMemNativeTransactionalStoreImpl.java?rev=1220349&view=auto ============================================================================== --- incubator/lcf/branches/CONNECTORS-286/warthog-reimport/src/test/java/org/apache/warthog/tests/InMemNativeTransactionalStoreImpl.java (added) +++ incubator/lcf/branches/CONNECTORS-286/warthog-reimport/src/test/java/org/apache/warthog/tests/InMemNativeTransactionalStoreImpl.java Sun Dec 18 08:26:30 2011 @@ -0,0 +1,46 @@ +/* $Id: InMemNativeTransactionalStoreImpl.java 1208116 2011-11-29 22:36:26Z kwright $ */ + +/** +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.warthog.tests; + +import org.apache.warthog.api.*; +import org.apache.warthog.keyvalue.*; +import org.apache.warthog.transactionalkeyvaluestore.*; + +/** Class implementing a native transactional key/value store completely in memory. +*/ +public class InMemNativeTransactionalStoreImpl implements WHTransactionalStore +{ + /** Underlying atomic key/value store */ + protected WHAtomicKeyValueStore underlyingStore; + + /** Constructor. */ + public InMemNativeTransactionalStoreImpl(WHAtomicKeyValueStore underlyingStore) + { + this.underlyingStore = underlyingStore; + } + + /** Create a transaction. */ + public WHTransaction createTransaction() + throws WHException + { + return new InMemNativeTransactionImpl(this.underlyingStore); + } + +} Added: incubator/lcf/branches/CONNECTORS-286/warthog-reimport/src/test/java/org/apache/warthog/tests/InMemTransactionImpl.java URL: http://svn.apache.org/viewvc/incubator/lcf/branches/CONNECTORS-286/warthog-reimport/src/test/java/org/apache/warthog/tests/InMemTransactionImpl.java?rev=1220349&view=auto ============================================================================== --- incubator/lcf/branches/CONNECTORS-286/warthog-reimport/src/test/java/org/apache/warthog/tests/InMemTransactionImpl.java (added) +++ incubator/lcf/branches/CONNECTORS-286/warthog-reimport/src/test/java/org/apache/warthog/tests/InMemTransactionImpl.java Sun Dec 18 08:26:30 2011 @@ -0,0 +1,172 @@ +/* $Id: InMemTransactionImpl.java 1208500 2011-11-30 16:38:45Z kwright $ */ + +/** +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.warthog.tests; + +import org.apache.warthog.api.*; +import org.apache.warthog.keyvalue.*; +import org.apache.warthog.transactionalkeyvaluestore.*; +import java.util.*; + +/** This class implements a transaction that is in progress, completely in memory. +*/ +public class InMemTransactionImpl implements WHTransaction +{ + protected WHAtomicKeyValueStore underlyingStore; + + protected KeyMap checkData = new KeyMap(); + protected KeyMap localData = null; + + /** Constructor */ + public InMemTransactionImpl(WHAtomicKeyValueStore underlyingStore) + { + this.underlyingStore = underlyingStore; + } + + /** Set a value. May be null. */ + public void put(WHKey key, WHKeyValue value) + throws WHException + { + if (localData == null) + localData = new KeyMap(); + localData.put(key,value); + } + + /** Get a value. Null returned if no value. */ + public WHKeyValue get(WHKey key) + throws WHException + { + if (localData != null) + { + if (localData.containsKey(key)) + return localData.get(key); + } + if (checkData.containsKey(key)) + return checkData.get(key); + WHKeyValue value = underlyingStore.get(key); + checkData.put(key,value); + return value; + } + + /** Check to see if this transaction has become inconsistent. + * If so, a WHDeadlockException is thrown. + */ + public void check() + throws WHException + { + underlyingStore.check(checkData); + } + + /** Commit this transaction */ + public void commit() + throws WHException + { + if (localData != null) + underlyingStore.setAll(checkData,localData); + else + underlyingStore.check(checkData); + } + + /** Abandon this transaction. + * This is called as a nicety to free any resources associated with the + * transaction. The implementation should also be robust as far as + * freeing resources if this method is NOT called, but might perform + * the necessary logic in a finalizer at an arbitrary time. + */ + public void abandon() + { + // Does nothing so long as the entire temporary transaction is in + // memory. + } + + /** Local implementation of WHKey/WHValue map */ + protected static class KeyMap implements WHKeyMap + { + protected Map map = new HashMap(); + + public KeyMap() + { + } + + /** Get a value from the map. + */ + public WHKeyValue get(WHKey key) + throws WHException + { + return map.get(new WrappedKey(key)); + } + + /** Iterate over the keys in the map. + */ + public WHKeyIterator iterator() + throws WHException + { + return new KeyIterator(map.keySet().iterator()); + } + + /** Get the size of the map. + */ + public long size() + throws WHException + { + return (long)map.size(); + } + + /** Check if the map contains the specified key. + */ + public boolean containsKey(WHKey key) + { + return map.containsKey(new WrappedKey(key)); + } + + /** Put a value. + */ + public void put(WHKey key, WHKeyValue value) + { + map.put(new WrappedKey(key),value); + } + } + + /** Key iterator for KeyMap */ + protected static class KeyIterator implements WHKeyIterator + { + protected Iterator iterator; + + public KeyIterator(Iterator iterator) + { + this.iterator = iterator; + } + + /** Check if there is another value */ + public boolean hasNext() + throws WHException + { + return iterator.hasNext(); + } + + /** Get the next value */ + public WHKey next() + throws WHException + { + return iterator.next().getKey(); + } + + } + +} Added: incubator/lcf/branches/CONNECTORS-286/warthog-reimport/src/test/java/org/apache/warthog/tests/InMemTransactionalStoreImpl.java URL: http://svn.apache.org/viewvc/incubator/lcf/branches/CONNECTORS-286/warthog-reimport/src/test/java/org/apache/warthog/tests/InMemTransactionalStoreImpl.java?rev=1220349&view=auto ============================================================================== --- incubator/lcf/branches/CONNECTORS-286/warthog-reimport/src/test/java/org/apache/warthog/tests/InMemTransactionalStoreImpl.java (added) +++ incubator/lcf/branches/CONNECTORS-286/warthog-reimport/src/test/java/org/apache/warthog/tests/InMemTransactionalStoreImpl.java Sun Dec 18 08:26:30 2011 @@ -0,0 +1,46 @@ +/* $Id: InMemTransactionalStoreImpl.java 1205763 2011-11-24 09:15:57Z kwright $ */ + +/** +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.warthog.tests; + +import org.apache.warthog.api.*; +import org.apache.warthog.keyvalue.*; +import org.apache.warthog.transactionalkeyvaluestore.*; + +/** Class implementing a transactional key/value store completely in memory. +*/ +public class InMemTransactionalStoreImpl implements WHTransactionalStore +{ + /** Underlying atomic key/value store */ + protected WHAtomicKeyValueStore underlyingStore; + + /** Constructor. */ + public InMemTransactionalStoreImpl(WHAtomicKeyValueStore underlyingStore) + { + this.underlyingStore = underlyingStore; + } + + /** Create a transaction. */ + public WHTransaction createTransaction() + throws WHException + { + return new InMemTransactionImpl(this.underlyingStore); + } + +} Added: incubator/lcf/branches/CONNECTORS-286/warthog-reimport/src/test/java/org/apache/warthog/tests/MultiThreadTest.java URL: http://svn.apache.org/viewvc/incubator/lcf/branches/CONNECTORS-286/warthog-reimport/src/test/java/org/apache/warthog/tests/MultiThreadTest.java?rev=1220349&view=auto ============================================================================== --- incubator/lcf/branches/CONNECTORS-286/warthog-reimport/src/test/java/org/apache/warthog/tests/MultiThreadTest.java (added) +++ incubator/lcf/branches/CONNECTORS-286/warthog-reimport/src/test/java/org/apache/warthog/tests/MultiThreadTest.java Sun Dec 18 08:26:30 2011 @@ -0,0 +1,473 @@ +/* $Id: MultiThreadTest.java 1212814 2011-12-10 15:26:17Z kwright $ */ + +/** +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +package org.apache.warthog.tests; + +import org.apache.warthog.api.*; +import org.apache.warthog.transactionalkeyvaluestore.*; +import org.apache.warthog.keyvaluetablestore.*; +import org.apache.warthog.keyvalue.*; +import org.apache.warthog.common.*; +import org.junit.*; +import static org.junit.Assert.*; +import java.util.*; + +public class MultiThreadTest +{ + + @Test + public void singleThreadBaselineTest() + throws Exception + { + doWrites(1); + } + + @Test + public void writeOnlyTest() + throws Exception + { + doWrites(24); + } + + protected void doWrites(int threadCount) + throws Exception + { + int totalInserts = 10000; + + int repeatCount = totalInserts/threadCount; + + WHAtomicKeyValueStore store = new InMemAtomicNativeNonblockingKeyValueStore(16777216); + + WHTableStore ts = new TableStore(new InMemNativeTransactionalStoreImpl(store)); + + ts.beginTransaction(); + ts.createTable("testtable",new String[]{"colA","colB","colC","colD","colE","colF","colG","colH","colI","colJ"}); + ts.commitTransaction(); + + ts.beginTransaction(); + WHTable table = ts.lookupTable("testtable"); + ts.createIndex("testindex1",table, + new String[]{"colB","colA"}, + new String[]{"org.apache.warthog.common.StringComparatorAscending", + "org.apache.warthog.common.LongComparatorAscending"},false); + ts.createIndex("testindex2",table, + new String[]{"colD","colC"}, + new String[]{"org.apache.warthog.common.StringComparatorAscending", + "org.apache.warthog.common.LongComparatorAscending"},false); + ts.createIndex("testindex3",table, + new String[]{"colF","colE"}, + new String[]{"org.apache.warthog.common.StringComparatorAscending", + "org.apache.warthog.common.LongComparatorAscending"},false); + ts.createIndex("testindex4",table, + new String[]{"colH","colG"}, + new String[]{"org.apache.warthog.common.StringComparatorAscending", + "org.apache.warthog.common.LongComparatorAscending"},false); + ts.createIndex("testindex5",table, + new String[]{"colJ","colI"}, + new String[]{"org.apache.warthog.common.StringComparatorAscending", + "org.apache.warthog.common.LongComparatorAscending"},false); + ts.commitTransaction(); + + InsertThread[] threads = new InsertThread[threadCount]; + for (int i = 0 ; i < threadCount ; i++) + { + threads[i] = new InsertThread("Worker thread "+i,store,repeatCount); + } + + long startTime = System.currentTimeMillis(); + for (int i = 0 ; i < threadCount ; i++) + { + threads[i].start(); + } + + // Join at the end + + int contentions = 0; + for (int i = 0 ; i < threadCount ; i++) + { + threads[i].join(); + Throwable e = threads[i].getException(); + if (e != null) + { + if (e instanceof Error) + throw (Error)e; + if (e instanceof RuntimeException) + throw (RuntimeException)e; + throw (Exception)e; + } + contentions += threads[i].getContentions(); + } + + System.out.println(Integer.toString(totalInserts)+" writes across "+ + Integer.toString(threadCount)+" threads took "+ + new Long(System.currentTimeMillis()-startTime)+" ms and had "+ + Integer.toString(contentions)+" contention retries"); + } + + @Test + public void writeQueryTest() + throws Exception + { + int totalCount = 10000/4; + int threadCount = 24; + + int repeatCount = totalCount/threadCount; + + WHAtomicKeyValueStore store = new InMemAtomicNativeNonblockingKeyValueStore(16777216); + + WHTableStore ts = new TableStore(new InMemNativeTransactionalStoreImpl(store)); + + ts.beginTransaction(); + ts.createTable("testtable",new String[]{"colA","colB","colC","colD","colE","colF","colG","colH","colI","colJ"}); + ts.commitTransaction(); + + ts.beginTransaction(); + WHTable table = ts.lookupTable("testtable"); + ts.createIndex("testindex1",table, + new String[]{"colB","colA"}, + new String[]{"org.apache.warthog.common.StringComparatorAscending", + "org.apache.warthog.common.LongComparatorAscending"},false); + ts.createIndex("testindex2",table, + new String[]{"colD","colC"}, + new String[]{"org.apache.warthog.common.StringComparatorAscending", + "org.apache.warthog.common.LongComparatorAscending"},false); + ts.createIndex("testindex3",table, + new String[]{"colF","colE"}, + new String[]{"org.apache.warthog.common.StringComparatorAscending", + "org.apache.warthog.common.LongComparatorAscending"},false); + ts.createIndex("testindex4",table, + new String[]{"colH","colG"}, + new String[]{"org.apache.warthog.common.StringComparatorAscending", + "org.apache.warthog.common.LongComparatorAscending"},false); + ts.createIndex("testindex5",table, + new String[]{"colJ","colI"}, + new String[]{"org.apache.warthog.common.StringComparatorAscending", + "org.apache.warthog.common.LongComparatorAscending"},false); + ts.commitTransaction(); + + TaskThread[] threads = new TaskThread[threadCount]; + for (int i = 0 ; i < threadCount ; i++) + { + threads[i] = new TaskThread("Worker thread "+i,store,repeatCount); + } + + long startTime = System.currentTimeMillis(); + for (int i = 0 ; i < threadCount ; i++) + { + threads[i].start(); + } + + // Join at the end + + int contentions = 0; + for (int i = 0 ; i < threadCount ; i++) + { + threads[i].join(); + Throwable e = threads[i].getException(); + if (e != null) + { + if (e instanceof Error) + throw (Error)e; + if (e instanceof RuntimeException) + throw (RuntimeException)e; + throw (Exception)e; + } + contentions += threads[i].getContentions(); + } + + System.out.println(Integer.toString(threadCount*repeatCount)+" writes and "+ + Integer.toString(threadCount*repeatCount*3)+" reads across "+ + Integer.toString(threadCount)+" threads took "+ + new Long(System.currentTimeMillis()-startTime)+" ms and had "+ + Integer.toString(contentions)+" contention retries"); + } + + // Protected methods + + protected static String randomString(Random r) + { + return new Long(randomLong(r)).toString(); + } + + protected static long randomLong(Random r) + { + return r.nextLong(); + } + + protected static class InsertThread extends Thread + { + protected int repeatCount; + protected int contentions = 0; + protected WHAtomicKeyValueStore store; + protected Throwable exception; + protected Random r = new Random(); + + public InsertThread(String name, WHAtomicKeyValueStore store, int repeatCount) + { + super(); + this.repeatCount = repeatCount; + this.store = store; + setName(name); + } + + public void run() + { + try + { + WHTableStore ts = new TableStore(new InMemNativeTransactionalStoreImpl(store)); + performInserts(ts); + } + catch (Throwable e) + { + exception = e; + } + } + + public Throwable getException() + { + return exception; + } + + public int getContentions() + { + return contentions; + } + + protected void backOff() + throws InterruptedException + { + contentions++; + //Thread.sleep(r.nextInt(100)); + Thread.yield(); + } + + protected void performInserts(WHTableStore ts) + throws InterruptedException, WHException + { + // Task consists of a certain number of inserts and queries + for (int i = 0 ; i < repeatCount ; i++) + { + // Add a row + while (true) + { + ts.beginTransaction(); + try + { + WHTable table = ts.lookupTable("testtable"); + table.insertRow(new String[]{"colA","colB","colC","colD","colE","colF","colG","colH","colI","colJ"}, + new WHValue[]{ + new LongValue(randomLong(r)),new StringValue(randomString(r)), + new LongValue(randomLong(r)),new StringValue(randomString(r)), + new LongValue(randomLong(r)),new StringValue(randomString(r)), + new LongValue(randomLong(r)),new StringValue(randomString(r)), + new LongValue(randomLong(r)),new StringValue(randomString(r))}); + ts.commitTransaction(); + break; + } + catch (WHConcurrencyException e) + { + backOff(); + continue; + } + finally + { + ts.abandonTransaction(); + } + } + } + } + + } + + protected static class TaskThread extends Thread + { + protected int repeatCount; + protected int contentions = 0; + protected WHAtomicKeyValueStore store; + protected Throwable exception; + protected Random r = new Random(); + + public TaskThread(String name, WHAtomicKeyValueStore store, int repeatCount) + { + super(); + this.repeatCount = repeatCount; + this.store = store; + setName(name); + } + + public void run() + { + try + { + WHTableStore ts = new TableStore(new InMemNativeTransactionalStoreImpl(store)); + performTask(ts); + } + catch (Throwable e) + { + exception = e; + } + } + + public Throwable getException() + { + return exception; + } + + public int getContentions() + { + return contentions; + } + + protected void backOff() + throws InterruptedException + { + contentions++; + //Thread.sleep(r.nextInt(100)); + Thread.yield(); + } + + protected void performTask(WHTableStore ts) + throws InterruptedException, WHException + { + // Task consists of a certain number of inserts and queries + for (int i = 0 ; i < repeatCount ; i++) + { + // Add a row + while (true) + { + ts.beginTransaction(); + try + { + WHTable table = ts.lookupTable("testtable"); + table.insertRow(new String[]{"colA","colB","colC","colD","colE","colF","colG","colH","colI","colJ"}, + new WHValue[]{ + new LongValue(randomLong(r)),new StringValue(randomString(r)), + new LongValue(randomLong(r)),new StringValue(randomString(r)), + new LongValue(randomLong(r)),new StringValue(randomString(r)), + new LongValue(randomLong(r)),new StringValue(randomString(r)), + new LongValue(randomLong(r)),new StringValue(randomString(r))}); + ts.commitTransaction(); + break; + } + catch (WHConcurrencyException e) + { + backOff(); + continue; + } + finally + { + ts.abandonTransaction(); + } + } + + // Do a query + while (true) + { + ts.beginTransaction(); + try + { + WHIndex index = ts.lookupIndex("testindex1"); + WHAccessor accessor = index.buildAccessor(new IndexCriteria[]{ + new IndexBetween(new StringValue("1"),new StringValue("2")), + null},null); + while (true) + { + if (accessor.getCurrentRowID() == null) + break; + accessor.advance(); + } + ts.commitTransaction(); + break; + } + catch (WHConcurrencyException e) + { + backOff(); + continue; + } + finally + { + ts.abandonTransaction(); + } + } + + // Do another query + while (true) + { + ts.beginTransaction(); + try + { + WHIndex index = ts.lookupIndex("testindex2"); + WHAccessor accessor = index.buildAccessor(new IndexCriteria[]{ + new IndexBetween(new StringValue("23"),new StringValue("25")), + null},null); + while (true) + { + if (accessor.getCurrentRowID() == null) + break; + accessor.advance(); + } + ts.commitTransaction(); + break; + } + catch (WHConcurrencyException e) + { + backOff(); + continue; + } + finally + { + ts.abandonTransaction(); + } + } + + // Do a third query + while (true) + { + ts.beginTransaction(); + try + { + WHIndex index = ts.lookupIndex("testindex3"); + WHAccessor accessor = index.buildAccessor(new IndexCriteria[]{ + new IndexBetween(new StringValue("02"),new StringValue("04")), + null},null); + while (true) + { + if (accessor.getCurrentRowID() == null) + break; + accessor.advance(); + } + ts.commitTransaction(); + break; + } + catch (WHConcurrencyException e) + { + backOff(); + continue; + } + finally + { + ts.abandonTransaction(); + } + } + + } + } + + } + +} \ No newline at end of file Added: incubator/lcf/branches/CONNECTORS-286/warthog-reimport/src/test/java/org/apache/warthog/tests/PerformanceTest.java URL: http://svn.apache.org/viewvc/incubator/lcf/branches/CONNECTORS-286/warthog-reimport/src/test/java/org/apache/warthog/tests/PerformanceTest.java?rev=1220349&view=auto ============================================================================== --- incubator/lcf/branches/CONNECTORS-286/warthog-reimport/src/test/java/org/apache/warthog/tests/PerformanceTest.java (added) +++ incubator/lcf/branches/CONNECTORS-286/warthog-reimport/src/test/java/org/apache/warthog/tests/PerformanceTest.java Sun Dec 18 08:26:30 2011 @@ -0,0 +1,121 @@ +/* $Id: PerformanceTest.java 1208116 2011-11-29 22:36:26Z kwright $ */ + +/** +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +package org.apache.warthog.tests; + +import org.apache.warthog.api.*; +import org.apache.warthog.transactionalkeyvaluestore.*; +import org.apache.warthog.keyvaluetablestore.*; +import org.apache.warthog.keyvalue.*; +import org.apache.warthog.common.*; +import org.junit.*; +import static org.junit.Assert.*; +import java.util.*; + +public class PerformanceTest +{ + + @Test + public void rowCreateTest() + throws Exception + { + WHTableStore ts = new TableStore(new InMemNativeTransactionalStoreImpl(new InMemAtomicNativeKeyValueStore(16777216))); + + ts.beginTransaction(); + ts.createTable("testtable",new String[]{"colA","colB","colC","colD","colE","colF","colG","colH","colI","colJ"}); + ts.commitTransaction(); + + ts.beginTransaction(); + WHTable table = ts.lookupTable("testtable"); + ts.createIndex("testindex1",table, + new String[]{"colB","colA"}, + new String[]{"org.apache.warthog.common.StringComparatorAscending", + "org.apache.warthog.common.LongComparatorAscending"},false); + ts.createIndex("testindex2",table, + new String[]{"colD","colC"}, + new String[]{"org.apache.warthog.common.StringComparatorAscending", + "org.apache.warthog.common.LongComparatorAscending"},false); + ts.createIndex("testindex3",table, + new String[]{"colF","colE"}, + new String[]{"org.apache.warthog.common.StringComparatorAscending", + "org.apache.warthog.common.LongComparatorAscending"},false); + ts.createIndex("testindex4",table, + new String[]{"colH","colG"}, + new String[]{"org.apache.warthog.common.StringComparatorAscending", + "org.apache.warthog.common.LongComparatorAscending"},false); + ts.createIndex("testindex5",table, + new String[]{"colJ","colI"}, + new String[]{"org.apache.warthog.common.StringComparatorAscending", + "org.apache.warthog.common.LongComparatorAscending"},false); + ts.commitTransaction(); + + int rowCount = 10000; + Random r = new Random(); + String[] str1Values = new String[rowCount]; + long[] long1Values = new long[rowCount]; + String[] str2Values = new String[rowCount]; + long[] long2Values = new long[rowCount]; + String[] str3Values = new String[rowCount]; + long[] long3Values = new long[rowCount]; + String[] str4Values = new String[rowCount]; + long[] long4Values = new long[rowCount]; + String[] str5Values = new String[rowCount]; + long[] long5Values = new long[rowCount]; + for (int i = 0 ; i < rowCount ; i++) + { + str1Values[i] = randomString(r); + long1Values[i] = randomLong(r); + str2Values[i] = randomString(r); + long2Values[i] = randomLong(r); + str3Values[i] = randomString(r); + long3Values[i] = randomLong(r); + str4Values[i] = randomString(r); + long4Values[i] = randomLong(r); + str5Values[i] = randomString(r); + long5Values[i] = randomLong(r); + } + long startTime = System.currentTimeMillis(); + for (int i = 0 ; i < rowCount ; i++) + { + ts.beginTransaction(); + table = ts.lookupTable("testtable"); + table.insertRow(new String[]{"colA","colB","colC","colD","colE","colF","colG","colH","colI","colJ"}, + new WHValue[]{ + new LongValue(long1Values[i]),new StringValue(str1Values[i]), + new LongValue(long2Values[i]),new StringValue(str2Values[i]), + new LongValue(long3Values[i]),new StringValue(str3Values[i]), + new LongValue(long4Values[i]),new StringValue(str4Values[i]), + new LongValue(long5Values[i]),new StringValue(str5Values[i])}); + ts.commitTransaction(); + } + System.out.println("Create of "+Integer.toString(rowCount)+" rows took "+new Long(System.currentTimeMillis() - startTime).toString()+" ms"); + } + + // Protected methods + + protected static String randomString(Random r) + { + return new Long(randomLong(r)).toString(); + } + + protected static long randomLong(Random r) + { + return r.nextLong(); + } + +} \ No newline at end of file