activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rajdav...@apache.org
Subject svn commit: r702452 [2/2] - in /activemq/trunk: activemq-core/ activemq-core/src/test/java/org/apache/activemq/perf/ activemq-core/src/test/java/org/apache/activemq/transport/failover/ activemq-core/src/test/resources/ activemq-groups/ activemq-groups/...
Date Tue, 07 Oct 2008 12:32:02 GMT
Added: activemq/trunk/activemq-groups/src/main/java/org/apache/activegroups/GroupUpdateException.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-groups/src/main/java/org/apache/activegroups/GroupUpdateException.java?rev=702452&view=auto
==============================================================================
--- activemq/trunk/activemq-groups/src/main/java/org/apache/activegroups/GroupUpdateException.java (added)
+++ activemq/trunk/activemq-groups/src/main/java/org/apache/activegroups/GroupUpdateException.java Tue Oct  7 05:31:59 2008
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activegroups;
+
+/**
+ * thrown when updating a key to map that the local client doesn't own
+ *
+ */
+public class GroupUpdateException extends RuntimeException {
+    private static final long serialVersionUID = -7584658017201604560L;
+    
+    /**
+     * @param message
+     */
+    public GroupUpdateException(String message) {
+        super(message);
+    }
+}

Propchange: activemq/trunk/activemq-groups/src/main/java/org/apache/activegroups/GroupUpdateException.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/trunk/activemq-groups/src/main/java/org/apache/activegroups/Member.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-groups/src/main/java/org/apache/activegroups/Member.java?rev=702452&view=auto
==============================================================================
--- activemq/trunk/activemq-groups/src/main/java/org/apache/activegroups/Member.java (added)
+++ activemq/trunk/activemq-groups/src/main/java/org/apache/activegroups/Member.java Tue Oct  7 05:31:59 2008
@@ -0,0 +1,163 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activegroups;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import javax.jms.Destination;
+import org.apache.activemq.util.IdGenerator;
+
+/**
+ *<P>
+ * A <CODE>Member</CODE> holds information about a member of the group
+ * 
+ */
+public class Member implements Externalizable {
+    private String name;
+    private String id;
+    private String hostname;
+    private long startTime;
+    private int coordinatorWeight;
+    private Destination inBoxDestination;
+    private transient long timeStamp;
+    
+
+    /**
+     * Default constructor - only used by serialization
+     */
+    public Member() {    
+    }
+    /**
+     * @param name
+     */
+    public Member(String name) {
+        this.name = name;
+        this.hostname = IdGenerator.getHostName();
+        this.startTime=System.currentTimeMillis();
+    }
+
+    /**
+     * @return the name
+     */
+    public String getName() {
+        return this.name;
+    }
+
+    /**
+     * @return the id
+     */
+    public String getId() {
+        return this.id;
+    }
+    
+    void setId(String id) {
+        this.id=id;
+    }
+    
+    /**
+     * @return the hostname
+     */
+    public String getHostname() {
+        return this.hostname;
+    }
+    
+    /**
+     * @return the startTime
+     */
+    public long getStartTime() {
+        return this.startTime;
+    }
+    
+    /**
+     * @return the inbox destination
+     */
+    public Destination getInBoxDestination() {
+        return this.inBoxDestination;
+    }
+    
+    void setInBoxDestination(Destination dest) {
+        this.inBoxDestination=dest;
+    }
+    
+    /**
+     * @return the timeStamp
+     */
+    long getTimeStamp() {
+        return this.timeStamp;
+    }
+
+    /**
+     * @param timeStamp the timeStamp to set
+     */
+    void setTimeStamp(long timeStamp) {
+        this.timeStamp = timeStamp;
+    }
+    /**
+     * @return the coordinatorWeight
+     */
+    public int getCoordinatorWeight() {
+        return this.coordinatorWeight;
+    }
+    /**
+     * @param coordinatorWeight the coordinatorWeight to set
+     */
+    public void setCoordinatorWeight(int coordinatorWeight) {
+        this.coordinatorWeight = coordinatorWeight;
+    }
+    
+    
+    
+    public String toString() {
+        return this.name+"["+this.id+"]@"+this.hostname;
+    }
+    
+     
+    public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+        this.coordinatorWeight=in.readInt();;
+        this.name = in.readUTF();
+        this.id = in.readUTF();
+        this.hostname = in.readUTF();
+        this.startTime=in.readLong();
+        this.inBoxDestination=(Destination) in.readObject();
+        this.coordinatorWeight=in.readInt();
+    }
+
+    public void writeExternal(ObjectOutput out) throws IOException {
+        out.writeInt(this.coordinatorWeight);
+        out.writeUTF(this.name != null ? this.name : "");
+        out.writeUTF(this.id != null ? this.id : "");
+        out.writeUTF(this.hostname != null ? this.hostname : "");
+        out.writeLong(this.startTime);
+        out.writeObject(this.inBoxDestination);
+        out.writeInt(this.coordinatorWeight);
+    }
+    
+    public int hashCode() {
+        return this.id.hashCode();
+    }
+    
+    public boolean equals(Object obj) {
+        boolean result = false;
+        if (obj instanceof Member) {
+            Member other = (Member)obj;
+            result = this.id.equals(other.id);
+        }
+        return result;
+    }
+}

Propchange: activemq/trunk/activemq-groups/src/main/java/org/apache/activegroups/Member.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/trunk/activemq-groups/src/main/java/org/apache/activegroups/MemberChangedListener.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-groups/src/main/java/org/apache/activegroups/MemberChangedListener.java?rev=702452&view=auto
==============================================================================
--- activemq/trunk/activemq-groups/src/main/java/org/apache/activegroups/MemberChangedListener.java (added)
+++ activemq/trunk/activemq-groups/src/main/java/org/apache/activegroups/MemberChangedListener.java Tue Oct  7 05:31:59 2008
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activegroups;
+
+/**
+ * A listener for membership changes to a group
+ *
+ */
+public interface MemberChangedListener {
+    
+    /**
+     * Notification a member has started
+     * @param member
+     */
+    void memberStarted(Member member);
+    
+    /**
+     * Notification a member has stopped
+     * @param member
+     */
+    void memberStopped(Member member);
+}
\ No newline at end of file

Propchange: activemq/trunk/activemq-groups/src/main/java/org/apache/activegroups/MemberChangedListener.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/trunk/activemq-groups/src/main/java/org/apache/activegroups/command/AsyncMapRequest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-groups/src/main/java/org/apache/activegroups/command/AsyncMapRequest.java?rev=702452&view=auto
==============================================================================
--- activemq/trunk/activemq-groups/src/main/java/org/apache/activegroups/command/AsyncMapRequest.java (added)
+++ activemq/trunk/activemq-groups/src/main/java/org/apache/activegroups/command/AsyncMapRequest.java Tue Oct  7 05:31:59 2008
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activegroups.command;
+
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * Return information about map update
+ * 
+ */
+public class AsyncMapRequest implements RequestCallback{
+    private final Object mutex = new Object();
+    
+    private Set<String> requests = new HashSet<String>();
+
+    public void add(String id, MapRequest request) {
+        request.setCallback(this);
+        this.requests.add(id);
+    }
+    
+    /**
+     * Wait for requests
+     * @param timeout
+     * @return
+     */
+    public boolean isSuccess(long timeout) {
+        long deadline = System.currentTimeMillis() + timeout;
+        while (!this.requests.isEmpty()) {
+            synchronized (this.mutex) {
+                try {
+                    this.mutex.wait(timeout);
+                } catch (InterruptedException e) {
+                    break;
+                }
+            }
+            timeout = Math.max(deadline - System.currentTimeMillis(), 0);
+        }
+        return this.requests.isEmpty();
+    }
+
+    
+    public void finished(String id) {
+        this.requests.remove(id);
+        
+    }
+}
\ No newline at end of file

Propchange: activemq/trunk/activemq-groups/src/main/java/org/apache/activegroups/command/AsyncMapRequest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/trunk/activemq-groups/src/main/java/org/apache/activegroups/command/ElectionMessage.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-groups/src/main/java/org/apache/activegroups/command/ElectionMessage.java?rev=702452&view=auto
==============================================================================
--- activemq/trunk/activemq-groups/src/main/java/org/apache/activegroups/command/ElectionMessage.java (added)
+++ activemq/trunk/activemq-groups/src/main/java/org/apache/activegroups/command/ElectionMessage.java Tue Oct  7 05:31:59 2008
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activegroups.command;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import org.apache.activegroups.Member;
+
+/**
+ * Used to pass information around
+ *
+ */
+public class ElectionMessage implements Externalizable{
+    public static enum MessageType{ELECTION,ANSWER,COORDINATOR};
+    private Member member;
+    private MessageType type;
+    
+    /**
+     * @return the member
+     */
+    public Member getMember() {
+        return this.member;
+    }
+
+    /**
+     * @param member the member to set
+     */
+    public void setMember(Member member) {
+        this.member = member;
+    }
+
+    /**
+     * @return the type
+     */
+    public MessageType getType() {
+        return this.type;
+    }
+
+    /**
+     * @param type the type to set
+     */
+    public void setType(MessageType type) {
+        this.type = type;
+    }
+    
+    /**
+     * @return true if election message
+     */
+    public boolean isElection() {
+        return this.type != null && this.type.equals(MessageType.ELECTION);
+    }
+    
+    /**
+     * @return true if answer message
+     */
+    public boolean isAnswer() {
+        return this.type != null && this.type.equals(MessageType.ANSWER);
+    }
+    
+    /**
+     * @return true if coordinator message
+     */
+    public boolean isCoordinator() {
+        return this.type != null && this.type.equals(MessageType.COORDINATOR);
+    }
+    
+        
+    public ElectionMessage copy() {
+        ElectionMessage result = new ElectionMessage();
+        result.member=this.member;
+        result.type=this.type;
+        return result;
+    }
+    
+    
+    public void readExternal(ObjectInput in) throws IOException,
+            ClassNotFoundException {
+        this.member=(Member) in.readObject();
+        this.type=(MessageType) in.readObject();
+    }
+    
+    public void writeExternal(ObjectOutput out) throws IOException {
+        out.writeObject(this.member);
+        out.writeObject(this.type);
+    }
+    
+    public String toString() {
+        return "ElectionMessage: "+ this.member + "{"+this.type+ "}";
+    }
+}

Propchange: activemq/trunk/activemq-groups/src/main/java/org/apache/activegroups/command/ElectionMessage.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/trunk/activemq-groups/src/main/java/org/apache/activegroups/command/EntryKey.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-groups/src/main/java/org/apache/activegroups/command/EntryKey.java?rev=702452&view=auto
==============================================================================
--- activemq/trunk/activemq-groups/src/main/java/org/apache/activegroups/command/EntryKey.java (added)
+++ activemq/trunk/activemq-groups/src/main/java/org/apache/activegroups/command/EntryKey.java Tue Oct  7 05:31:59 2008
@@ -0,0 +1,210 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activegroups.command;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import org.apache.activegroups.Member;
+
+/**
+ * Holds information about an EntryKey
+ * 
+ */
+public class EntryKey<K> implements Externalizable {
+    private Member owner;
+    private K key;
+    private boolean locked;
+    private boolean removeOnExit;
+    private boolean releaseLockOnExit;
+    private long expiration;
+    private long lockExpiration;
+
+    /**
+     * Default constructor - for serialization
+     */
+    public EntryKey() {
+    }
+
+    public EntryKey(Member owner, K key) {
+        this.owner = owner;
+        this.key = key;
+    }
+
+    public int hashCode() {
+        return this.key != null ? this.key.hashCode() : super.hashCode();
+    }
+
+    /**
+     * @return the owner
+     */
+    public Member getOwner() {
+        return this.owner;
+    }
+    
+    public void setOwner(Member member) {
+        this.owner=member;
+    }
+
+    /**
+     * @return the key
+     */
+    public K getKey() {
+        return this.key;
+    }
+    
+    /**
+     * @return the share
+     */
+    public boolean isLocked() {
+        return this.locked;
+    }
+
+    /**
+     * @param share the share to set
+     */
+    public void setLocked(boolean locked) {
+        this.locked = locked;
+    }
+
+    /**
+     * @return the removeOnExit
+     */
+    public boolean isRemoveOnExit() {
+        return this.removeOnExit;
+    }
+
+    /**
+     * @param removeOnExit
+     *            the removeOnExit to set
+     */
+    public void setRemoveOnExit(boolean removeOnExit) {
+        this.removeOnExit = removeOnExit;
+    }
+    
+    /**
+     * @return the expiration
+     */
+    public long getExpiration() {
+        return expiration;
+    }
+
+    /**
+     * @param expiration the expiration to set
+     */
+    public void setExpiration(long expiration) {
+        this.expiration = expiration;
+    }
+    
+    /**
+     * @return the lockExpiration
+     */
+    public long getLockExpiration() {
+        return lockExpiration;
+    }
+
+    /**
+     * @param lockExpiration the lockExpiration to set
+     */
+    public void setLockExpiration(long lockExpiration) {
+        this.lockExpiration = lockExpiration;
+    }
+
+    /**
+     * @return the releaseLockOnExit
+     */
+    public boolean isReleaseLockOnExit() {
+        return releaseLockOnExit;
+    }
+
+    /**
+     * @param releaseLockOnExit the releaseLockOnExit to set
+     */
+    public void setReleaseLockOnExit(boolean releaseLockOnExit) {
+        this.releaseLockOnExit = releaseLockOnExit;
+    }
+    
+    public void setTimeToLive(long ttl) {
+        if (ttl > 0 ) {
+            this.expiration=ttl+System.currentTimeMillis();
+        }else {
+            this.expiration =0l;
+        }
+    }
+    
+    public void setLockLeaseTime(long ttl) {
+        if(ttl > 0) {
+            this.lockExpiration=ttl+System.currentTimeMillis();
+        }else {
+            this.lockExpiration=0l;
+        }
+    }
+    
+    public boolean isExpired() {
+        return isExpired(System.currentTimeMillis());
+    }
+    
+    public boolean isExpired(long currentTime) {
+        return this.expiration > 0 && this.expiration < currentTime;
+    }
+    
+    public boolean isLockExpired() {
+        return isLockExpired(System.currentTimeMillis());
+    }
+    
+    public boolean isLockExpired(long currentTime) {
+        return this.lockExpiration > 0 && this.lockExpiration < currentTime;
+    }
+    
+   
+    public boolean equals(Object obj) {
+        boolean result = false;
+        if (obj instanceof EntryKey) {
+            EntryKey other = (EntryKey) obj;
+            result = other.key.equals(this.key);
+        }
+        return result;
+    }
+
+    public void writeExternal(ObjectOutput out) throws IOException {
+        out.writeObject(this.owner);
+        out.writeObject(this.key);
+        out.writeBoolean(isLocked());
+        out.writeBoolean(isRemoveOnExit());
+        out.writeBoolean(isReleaseLockOnExit());
+        out.writeLong(getExpiration());
+        out.writeLong(getLockExpiration());
+    }
+
+    public void readExternal(ObjectInput in) throws IOException,
+            ClassNotFoundException {
+        this.owner = (Member) in.readObject();
+        this.key = (K) in.readObject();
+        this.locked = in.readBoolean();
+        this.removeOnExit=in.readBoolean();
+        this.releaseLockOnExit=in.readBoolean();
+        this.expiration=in.readLong();
+        this.lockExpiration=in.readLong();
+    }
+    
+    public String toString() {
+        return "key:"+this.key;
+    }
+
+    
+}

Propchange: activemq/trunk/activemq-groups/src/main/java/org/apache/activegroups/command/EntryKey.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/trunk/activemq-groups/src/main/java/org/apache/activegroups/command/EntryMessage.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-groups/src/main/java/org/apache/activegroups/command/EntryMessage.java?rev=702452&view=auto
==============================================================================
--- activemq/trunk/activemq-groups/src/main/java/org/apache/activegroups/command/EntryMessage.java (added)
+++ activemq/trunk/activemq-groups/src/main/java/org/apache/activegroups/command/EntryMessage.java Tue Oct  7 05:31:59 2008
@@ -0,0 +1,204 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activegroups.command;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+
+/**
+ * Used to pass information around
+ *
+ */
+public class EntryMessage implements Externalizable{
+    public static enum MessageType{INSERT,DELETE,SYNC};
+    private EntryKey key;
+    private Object value;
+    private Object oldValue;
+    private MessageType type;
+    private boolean mapUpdate;
+    private boolean expired;
+    private boolean lockExpired;
+    private boolean lockUpdate;
+    
+    /**
+     * @return the owner
+     */
+    public EntryKey getKey() {
+        return this.key;
+    }
+    /**
+     * @param key
+     */
+    public void setKey(EntryKey key) {
+        this.key = key;
+    }
+    /**
+     * @return the value
+     */
+    public Object getValue() {
+        return this.value;
+    }
+    /**
+     * @param value the value to set
+     */
+    public void setValue(Object value) {
+        this.value = value;
+    }
+    
+    /**
+     * @return the oldValue
+     */
+    public Object getOldValue() {
+        return this.oldValue;
+    }
+    /**
+     * @param oldValue the oldValue to set
+     */
+    public void setOldValue(Object oldValue) {
+        this.oldValue = oldValue;
+    }
+    
+    /**
+     * @return the type
+     */
+    public MessageType getType() {
+        return this.type;
+    }
+    /**
+     * @param type the type to set
+     */
+    public void setType(MessageType type) {
+        this.type = type;
+    }
+    
+    /**
+     * @return the mapUpdate
+     */
+    public boolean isMapUpdate() {
+        return this.mapUpdate;
+    }
+    /**
+     * @param mapUpdate the mapUpdate to set
+     */
+    public void setMapUpdate(boolean mapUpdate) {
+        this.mapUpdate = mapUpdate;
+    }
+    
+    /**
+     * @return the expired
+     */
+    public boolean isExpired() {
+        return expired;
+    }
+    /**
+     * @param expired the expired to set
+     */
+    public void setExpired(boolean expired) {
+        this.expired = expired;
+    }
+    
+    /**
+     * @return the lockExpired
+     */
+    public boolean isLockExpired() {
+        return lockExpired;
+    }
+    /**
+     * @param lockExpired the lockExpired to set
+     */
+    public void setLockExpired(boolean lockExpired) {
+        this.lockExpired = lockExpired;
+    }
+    
+    /**
+     * @return the lockUpdate
+     */
+    public boolean isLockUpdate() {
+        return lockUpdate;
+    }
+    /**
+     * @param lockUpdate the lockUpdate to set
+     */
+    public void setLockUpdate(boolean lockUpdate) {
+        this.lockUpdate = lockUpdate;
+    }
+    
+    /**
+     * @return if insert message
+     */
+    public boolean isInsert() {
+        return this.type != null && this.type.equals(MessageType.INSERT);
+    }
+    
+    /**
+     * @return true if delete message
+     */
+    public boolean isDelete() {
+        return this.type != null && this.type.equals(MessageType.DELETE);
+    }
+    
+    public boolean isSync() {
+        return this.type != null && this.type.equals(MessageType.SYNC);
+    }
+    
+    
+    public EntryMessage copy() {
+        EntryMessage result = new EntryMessage();
+        result.key=this.key;
+        result.value=this.value;
+        result.oldValue=this.oldValue;
+        result.type=this.type;
+        result.mapUpdate=this.mapUpdate;
+        result.expired=this.expired;
+        result.lockExpired=this.lockExpired;
+        result.lockUpdate=this.lockUpdate;
+        return result;
+    }
+    
+    
+    
+    public void readExternal(ObjectInput in) throws IOException,
+            ClassNotFoundException {
+        this.key=(EntryKey) in.readObject();
+        this.value=in.readObject();
+        this.oldValue=in.readObject();
+        this.type=(MessageType) in.readObject();  
+        this.mapUpdate=in.readBoolean();
+        this.expired=in.readBoolean();
+        this.lockExpired=in.readBoolean();
+        this.lockUpdate=in.readBoolean();
+    }
+    
+    public void writeExternal(ObjectOutput out) throws IOException {
+        out.writeObject(this.key);
+        out.writeObject(this.value);
+        out.writeObject(this.oldValue);
+        out.writeObject(this.type);
+        out.writeBoolean(this.mapUpdate);
+        out.writeBoolean(this.expired);
+        out.writeBoolean(this.lockExpired);
+        out.writeBoolean(this.lockUpdate);
+    }
+    
+    public String toString() {
+        return "EntryMessage: "+this.type + "[" + this.key + "," + this.value +
+            "]{update=" + this.mapUpdate + "}";
+    }
+    
+}

Propchange: activemq/trunk/activemq-groups/src/main/java/org/apache/activegroups/command/EntryMessage.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/trunk/activemq-groups/src/main/java/org/apache/activegroups/command/EntryValue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-groups/src/main/java/org/apache/activegroups/command/EntryValue.java?rev=702452&view=auto
==============================================================================
--- activemq/trunk/activemq-groups/src/main/java/org/apache/activegroups/command/EntryValue.java (added)
+++ activemq/trunk/activemq-groups/src/main/java/org/apache/activegroups/command/EntryValue.java Tue Oct  7 05:31:59 2008
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY VIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activegroups.command;
+
+
+
+/**
+ * Holds information about the Value in the Map
+ *
+ */
+public class EntryValue<V> {
+    private EntryKey key;
+    private V value;
+    
+    
+    public EntryValue(EntryKey key, V value){
+        this.key=key;
+        this.value=value;
+    }
+    
+    /**
+     * @return the owner
+     */
+    public EntryKey getKey() {
+        return this.key;
+    }
+
+    /**
+     * @return the key
+     */
+    public V getValue() {
+        return this.value;
+    }
+    
+    /**
+     * set the value
+     * @param value
+     */
+    public void setValue(V value) {
+        this.value=value;
+    }
+    
+    public int hashCode() {
+        return this.value != null ? this.value.hashCode() : super.hashCode();
+    }
+    
+    public boolean equals(Object obj) {
+        boolean result = false;
+        if (obj instanceof EntryValue) {
+            EntryValue other = (EntryValue)obj;
+            result = (this.value==null && other.value==null) ||
+                (this.value != null && other.value != null && this.value.equals(other.value));
+        }
+        return result;
+    }
+}
+

Propchange: activemq/trunk/activemq-groups/src/main/java/org/apache/activegroups/command/EntryValue.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/trunk/activemq-groups/src/main/java/org/apache/activegroups/command/MapRequest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-groups/src/main/java/org/apache/activegroups/command/MapRequest.java?rev=702452&view=auto
==============================================================================
--- activemq/trunk/activemq-groups/src/main/java/org/apache/activegroups/command/MapRequest.java (added)
+++ activemq/trunk/activemq-groups/src/main/java/org/apache/activegroups/command/MapRequest.java Tue Oct  7 05:31:59 2008
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activegroups.command;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * Return information about map update
+ * 
+ */
+public class MapRequest {
+    private static final Log LOG = LogFactory.getLog(MapRequest.class);
+    private final AtomicBoolean done = new AtomicBoolean();
+    private Object response;
+    private RequestCallback callback;
+
+    public Object get(long timeout) {
+        synchronized (this.done) {
+            if (this.done.get() == false && this.response == null) {
+                try {
+                    this.done.wait(timeout);
+                } catch (InterruptedException e) {
+                    LOG.warn("Interrupted in  get("+timeout+")",e);
+                }
+            }
+        }
+        return this.response;
+    }
+
+    public void put(String id,Object response) {
+        this.response = response;
+        cancel();
+        RequestCallback callback = this.callback;
+        if (callback != null) {
+            callback.finished(id);
+        }
+    }
+
+    public void cancel() {
+        this.done.set(true);
+        synchronized (this.done) {
+            this.done.notifyAll();
+        }
+    }
+    
+    public void setCallback(RequestCallback callback) {
+        this.callback=callback;
+    }
+}

Propchange: activemq/trunk/activemq-groups/src/main/java/org/apache/activegroups/command/MapRequest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/trunk/activemq-groups/src/main/java/org/apache/activegroups/command/RequestCallback.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-groups/src/main/java/org/apache/activegroups/command/RequestCallback.java?rev=702452&view=auto
==============================================================================
--- activemq/trunk/activemq-groups/src/main/java/org/apache/activegroups/command/RequestCallback.java (added)
+++ activemq/trunk/activemq-groups/src/main/java/org/apache/activegroups/command/RequestCallback.java Tue Oct  7 05:31:59 2008
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activegroups.command;
+
+
+/**
+ * Return information about map update
+ * 
+ */
+public interface RequestCallback{
+    /**
+     * Optionally called when a request is finished
+     * @param id
+     */
+    void finished(String id);
+}

Propchange: activemq/trunk/activemq-groups/src/main/java/org/apache/activegroups/command/RequestCallback.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/trunk/activemq-groups/src/main/java/org/apache/activegroups/package.html
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-groups/src/main/java/org/apache/activegroups/package.html?rev=702452&view=auto
==============================================================================
--- activemq/trunk/activemq-groups/src/main/java/org/apache/activegroups/package.html (added)
+++ activemq/trunk/activemq-groups/src/main/java/org/apache/activegroups/package.html Tue Oct  7 05:31:59 2008
@@ -0,0 +1,25 @@
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+   
+    http://www.apache.org/licenses/LICENSE-2.0
+   
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<html>
+<head>
+</head>
+<body>
+
+Shared state, messaging and membership information between members of a distributed  group
+
+</body>
+</html>

Propchange: activemq/trunk/activemq-groups/src/main/java/org/apache/activegroups/package.html
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/trunk/activemq-groups/src/test/eclipse-resources/log4j.properties
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-groups/src/test/eclipse-resources/log4j.properties?rev=702452&view=auto
==============================================================================
--- activemq/trunk/activemq-groups/src/test/eclipse-resources/log4j.properties (added)
+++ activemq/trunk/activemq-groups/src/test/eclipse-resources/log4j.properties Tue Oct  7 05:31:59 2008
@@ -0,0 +1,37 @@
+## ------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements.  See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License.  You may obtain a copy of the License at
+## 
+## http://www.apache.org/licenses/LICENSE-2.0
+## 
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ------------------------------------------------------------------------
+
+#
+# The logging properties used for eclipse testing, We want to see debug output on the console.
+#
+log4j.rootLogger=INFO, out
+
+
+
+# CONSOLE appender not used by default
+log4j.appender.out=org.apache.log4j.ConsoleAppender
+log4j.appender.out.layout=org.apache.log4j.PatternLayout
+log4j.appender.out.layout.ConversionPattern=[%30.30t] %-30.30c{1} %-5p %m%n
+#log4j.appender.out.layout.ConversionPattern=%d [%-15.15t] %-5p %-30.30c{1} - %m%n
+
+# File appender
+log4j.appender.fout=org.apache.log4j.FileAppender
+log4j.appender.fout.layout=org.apache.log4j.PatternLayout
+log4j.appender.fout.layout.ConversionPattern=%d [%-15.15t] %-5p %-30.30c{1} - %m%n
+log4j.appender.fout.file=target/amq-testlog.log
+log4j.appender.fout.append=true
+

Propchange: activemq/trunk/activemq-groups/src/test/eclipse-resources/log4j.properties
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/trunk/activemq-groups/src/test/java/org/apache/activegroups/GroupMemberTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-groups/src/test/java/org/apache/activegroups/GroupMemberTest.java?rev=702452&view=auto
==============================================================================
--- activemq/trunk/activemq-groups/src/test/java/org/apache/activegroups/GroupMemberTest.java (added)
+++ activemq/trunk/activemq-groups/src/test/java/org/apache/activegroups/GroupMemberTest.java Tue Oct  7 05:31:59 2008
@@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activegroups;
+
+import java.util.ArrayList;
+import java.util.List;
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import junit.framework.TestCase;
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+
+
+public class GroupMemberTest extends TestCase {
+    protected BrokerService broker;
+    protected String bindAddress = ActiveMQConnectionFactory.DEFAULT_BROKER_BIND_URL;
+
+        
+    public void testCoordinatorSelection() throws Exception{
+        Group group = new Group(null,"");
+        List<Member>list = new ArrayList<Member>();
+        final int number =10;
+        Member choosen = null;
+        for (int i =0;i< number;i++) {
+            Member m = new Member("group"+i);
+            m.setId(""+i);
+            if (number/2==i) {
+                m.setCoordinatorWeight(10);
+                choosen=m;
+            }
+            list.add(m);
+        }
+        Member c = group.selectCordinator(list);
+        assertEquals(c,choosen);
+    }
+    /**
+     * Test method for
+     * {@link org.apache.activemq.group.Group#addMemberChangedListener(org.apache.activemq.group.MemberChangedListener)}.
+     * @throws Exception 
+     */
+    public void testGroup() throws Exception {
+        
+        final int number = 10;
+        List<Group>groupMaps = new ArrayList<Group>();
+        ConnectionFactory factory = createConnectionFactory();
+        for (int i =0; i < number; i++) {
+            Connection connection = factory.createConnection();
+            Group map = new Group(connection,"map"+i);
+            map.setHeartBeatInterval(200);
+            map.setMinimumGroupSize(i+1);
+            map.start();
+            groupMaps.add(map);
+        }
+        
+        int coordinatorNumber = 0;
+        for (Group map:groupMaps) {
+            if (map.isCoordinator()) {
+                coordinatorNumber++;
+            }
+        }
+        for(Group map:groupMaps) {
+            map.stop();
+        }
+        
+    }
+    
+public void testWeightedGroup() throws Exception {
+        
+        final int number = 10;
+        List<Group>groupMaps = new ArrayList<Group>();
+        Group last = null;
+        ConnectionFactory factory = createConnectionFactory();
+        for (int i =0; i < number; i++) {
+            Connection connection = factory.createConnection();
+            Group map = new Group(connection,"map"+i);
+            if(i ==number/2) {
+                map.setCoordinatorWeight(10);
+                last=map;
+            }
+            
+            map.setMinimumGroupSize(i+1);
+            map.start();
+            groupMaps.add(map);
+        }
+        Thread.sleep(2000);
+        int coordinator = 0;
+        Group groupCoordinator = null;
+        for (Group map:groupMaps) {
+            if (map.isCoordinator()) {
+                coordinator++;
+                groupCoordinator=map;
+            }
+        }
+             
+        
+        assertNotNull(groupCoordinator);
+        assertEquals(1,coordinator);
+        assertEquals(last.getName(),groupCoordinator.getName());
+        
+        for(Group map:groupMaps) {
+            map.stop();
+        }
+    }
+
+    
+
+    protected void setUp() throws Exception {
+        if (broker == null) {
+            broker = createBroker();
+        }
+        super.setUp();
+    }
+
+    protected void tearDown() throws Exception {
+        super.tearDown();
+        if (broker != null) {
+            broker.stop();
+        }
+    }
+
+    protected ActiveMQConnectionFactory createConnectionFactory()throws Exception {
+        ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(
+                ActiveMQConnection.DEFAULT_BROKER_URL);
+        return cf;
+    }
+
+    protected BrokerService createBroker() throws Exception {
+        BrokerService answer = new BrokerService();
+        configureBroker(answer);
+        answer.start();
+        return answer;
+    }
+
+    protected void configureBroker(BrokerService answer) throws Exception {
+        answer.setPersistent(false);
+        answer.addConnector(bindAddress);
+        answer.setDeleteAllMessagesOnStartup(true);
+    }
+}
+

Propchange: activemq/trunk/activemq-groups/src/test/java/org/apache/activegroups/GroupMemberTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/trunk/activemq-groups/src/test/java/org/apache/activegroups/GroupMessageTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-groups/src/test/java/org/apache/activegroups/GroupMessageTest.java?rev=702452&view=auto
==============================================================================
--- activemq/trunk/activemq-groups/src/test/java/org/apache/activegroups/GroupMessageTest.java (added)
+++ activemq/trunk/activemq-groups/src/test/java/org/apache/activegroups/GroupMessageTest.java Tue Oct  7 05:31:59 2008
@@ -0,0 +1,230 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activegroups;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.JMSException;
+import junit.framework.TestCase;
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+
+public class GroupMessageTest extends TestCase {
+    protected BrokerService broker;
+    protected String bindAddress = ActiveMQConnectionFactory.DEFAULT_BROKER_BIND_URL;
+
+    public void testGroupBroadcast() throws Exception {
+        final int number = 10;
+        final AtomicInteger count = new AtomicInteger();
+        List<Group> groups = new ArrayList<Group>();
+        ConnectionFactory factory = createConnectionFactory();
+        for (int i = 0; i < number; i++) {
+            Connection connection = factory.createConnection();
+            Group group = new Group(connection, "group" + i);            
+            group.setMinimumGroupSize(i+1);
+            group.start();
+            groups.add(group);
+            group.addGroupMessageListener(new GroupMessageListener() {
+                public void messageDelivered(Member sender, String replyId,
+                        Object message) {
+                    synchronized (count) {
+                        if (count.incrementAndGet() == number) {
+                            count.notifyAll();
+                        }
+                    }
+                }
+            });
+        }
+        groups.get(0).broadcastMessage("hello");
+        synchronized (count) {
+            if (count.get() < number) {
+                count.wait(5000);
+            }
+        }
+        assertEquals(number, count.get());
+        for (Group map : groups) {
+            map.stop();
+        }
+    }
+
+    public void testsendMessage() throws Exception {
+        final int number = 10;
+        final AtomicInteger count = new AtomicInteger();
+        List<Group> groups = new ArrayList<Group>();
+        ConnectionFactory factory = createConnectionFactory();
+        for (int i = 0; i < number; i++) {
+            Connection connection = factory.createConnection();
+            Group group = new Group(connection, "group" + i);
+            group.setMinimumGroupSize(i+1);
+            group.start();
+            groups.add(group);
+            group.addGroupMessageListener(new GroupMessageListener() {
+                public void messageDelivered(Member sender, String replyId,
+                        Object message) {
+                    synchronized (count) {
+                        count.incrementAndGet();
+                        count.notifyAll();
+                    }
+                }
+            });
+        }
+        groups.get(0).sendMessage("hello");
+        synchronized (count) {
+            if (count.get() == 0) {
+                count.wait(5000);
+            }
+        }
+        // wait a while to check that only one got it
+        Thread.sleep(2000);
+        assertEquals(1, count.get());
+        for (Group map : groups) {
+            map.stop();
+        }
+    }
+
+    public void testSendToSingleMember() throws Exception {
+        ConnectionFactory factory = createConnectionFactory();
+        Connection connection1 = factory.createConnection();
+        Connection connection2 = factory.createConnection();
+        Group group1 = new Group(connection1, "group1");
+        final AtomicBoolean called = new AtomicBoolean();
+        group1.addGroupMessageListener(new GroupMessageListener() {
+            public void messageDelivered(Member sender, String replyId,
+                    Object message) {
+                synchronized (called) {
+                    called.set(true);
+                    called.notifyAll();
+                }
+            }
+        });
+        group1.start();
+        Group group2 = new Group(connection2, "group2");
+        group2.setMinimumGroupSize(2);
+        group2.start();
+        Member member1 = group2.getMemberByName("group1");
+        group2.sendMessage(member1, "hello");
+        synchronized (called) {
+            if (!called.get()) {
+                called.wait(5000);
+            }
+        }
+        assertTrue(called.get());
+        group1.stop();
+        group2.stop();
+    }
+
+    public void testSendRequestReply() throws Exception {
+        ConnectionFactory factory = createConnectionFactory();
+        Connection connection1 = factory.createConnection();
+        Connection connection2 = factory.createConnection();
+        final int number = 1000;
+        final AtomicInteger requestCount = new AtomicInteger();
+        final AtomicInteger replyCount = new AtomicInteger();
+        final List<String> requests = new ArrayList<String>();
+        final List<String> replies = new ArrayList<String>();
+        for (int i = 0; i < number; i++) {
+            requests.add("request" + i);
+            replies.add("reply" + i);
+        }
+        final Group group1 = new Group(connection1, "group1");
+        final AtomicBoolean finished = new AtomicBoolean();
+        group1.addGroupMessageListener(new GroupMessageListener() {
+            public void messageDelivered(Member sender, String replyId,
+                    Object message) {
+                if (!replies.isEmpty()) {
+                    String reply = replies.remove(0);
+                    try {
+                        group1.sendMessageResponse(sender, replyId, reply);
+                    } catch (JMSException e) {
+                        e.printStackTrace();
+                    }
+                } 
+            }
+        });
+        group1.start();
+        final Group group2 = new Group(connection2, "group2");
+        group2.setMinimumGroupSize(2);
+        group2.addGroupMessageListener(new GroupMessageListener() {
+            public void messageDelivered(Member sender, String replyId,
+                    Object message) {
+                if (!requests.isEmpty()) {
+                    String request = requests.remove(0);
+                    try {
+                        group2.sendMessage(sender, request);
+                    } catch (JMSException e) {
+                        e.printStackTrace();
+                    }
+                }else {
+                    synchronized (finished) {
+                        finished.set(true);
+                        finished.notifyAll();
+                    }
+                }
+            }
+        });
+        group2.start();
+        Member member1 = group2.getMemberByName("group1");
+        group2.sendMessage(member1, requests.remove(0));
+        synchronized (finished) {
+            if (!finished.get()) {
+                finished.wait(10000);
+            }
+        }
+        assertTrue(finished.get());
+        group1.stop();
+        group2.stop();
+    }
+
+    protected void setUp() throws Exception {
+        if (broker == null) {
+            broker = createBroker();
+        }
+        super.setUp();
+    }
+
+    protected void tearDown() throws Exception {
+        super.tearDown();
+        if (broker != null) {
+            broker.stop();
+        }
+    }
+
+    protected ActiveMQConnectionFactory createConnectionFactory()
+            throws Exception {
+        ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(
+                ActiveMQConnection.DEFAULT_BROKER_URL);
+        return cf;
+    }
+
+    protected BrokerService createBroker() throws Exception {
+        BrokerService answer = new BrokerService();
+        configureBroker(answer);
+        answer.start();
+        return answer;
+    }
+
+    protected void configureBroker(BrokerService answer) throws Exception {
+        answer.setPersistent(false);
+        answer.addConnector(bindAddress);
+        answer.setDeleteAllMessagesOnStartup(true);
+    }
+}

Propchange: activemq/trunk/activemq-groups/src/test/java/org/apache/activegroups/GroupMessageTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/trunk/activemq-groups/src/test/java/org/apache/activegroups/GroupStateTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-groups/src/test/java/org/apache/activegroups/GroupStateTest.java?rev=702452&view=auto
==============================================================================
--- activemq/trunk/activemq-groups/src/test/java/org/apache/activegroups/GroupStateTest.java (added)
+++ activemq/trunk/activemq-groups/src/test/java/org/apache/activegroups/GroupStateTest.java Tue Oct  7 05:31:59 2008
@@ -0,0 +1,548 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activegroups;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import junit.framework.TestCase;
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+
+
+public class GroupStateTest extends TestCase {
+    protected BrokerService broker;
+    protected Connection connection1;
+    protected Connection connection2;
+    protected String bindAddress = ActiveMQConnectionFactory.DEFAULT_BROKER_BIND_URL;
+
+    /**
+     * Test method for
+     * {@link org.apache.activemq.group.Group#addMemberChangedListener(org.apache.activemq.group.MemberChangedListener)}.
+     * @throws Exception 
+     */
+    public void testAddMemberChangedListener() throws Exception {
+        final AtomicInteger counter = new AtomicInteger();
+        Group map1 = new Group(connection1,"map1");
+        map1.addMemberChangedListener(new MemberChangedListener(){
+
+            public void memberStarted(Member member) {
+                synchronized(counter) {
+                    counter.incrementAndGet();
+                    counter.notifyAll();
+                }
+                
+            }
+
+            public void memberStopped(Member member) {
+                synchronized(counter) {
+                    counter.decrementAndGet();
+                    counter.notifyAll();
+                }
+            }
+            
+        });
+        map1.start();
+        synchronized(counter) {
+            if (counter.get()<1) {
+                counter.wait(5000);
+            }
+        }
+        assertEquals(1, counter.get());
+        Group map2 = new Group(connection2,"map2");
+        map2.start();
+        synchronized(counter) {
+            if (counter.get()<2) {
+                counter.wait(5000);
+            }
+        }
+        assertEquals(2, counter.get());
+        map2.stop();
+        synchronized(counter) {
+            if (counter.get()>=2) {
+                counter.wait(Group.DEFAULT_HEART_BEAT_INTERVAL*3);
+            }
+        }
+        assertEquals(1, counter.get());
+        map1.stop();
+    }
+
+    /**
+     * Test method for
+     * {@link org.apache.activemq.group.Group#addMapChangedListener(org.apache.activemq.group.MapChangedListener)}.
+     * @throws Exception 
+     */
+    public void testAddMapChangedListener() throws Exception {
+        final AtomicBoolean called1 = new AtomicBoolean();
+        final AtomicBoolean called2 = new AtomicBoolean();
+        
+        Group map1 = new Group(connection1,"map1");
+        
+        map1.addMapChangedListener(new DefaultMapChangedListener() {
+            public void mapInsert(Member owner,Object Key, Object Value) {
+                synchronized(called1) {
+                    called1.set(true);
+                    called1.notifyAll();
+                }
+            }
+        });
+        map1.start();
+        
+        Group map2 = new Group(connection2,"map2");
+        
+        map2.addMapChangedListener(new DefaultMapChangedListener() {
+            public void mapInsert(Member owner,Object Key, Object Value) {
+                synchronized(called2) {
+                    called2.set(true);
+                    called2.notifyAll();
+                }
+            }
+        });
+        map2.start();
+        
+        
+        map1.put("test", "blob");
+        synchronized(called1) {
+            if (!called1.get()) {
+               called1.wait(5000); 
+            }
+        }
+        synchronized(called2) {
+            if (!called2.get()) {
+               called2.wait(5000); 
+            }
+        }
+        assertTrue(called1.get());
+        assertTrue(called2.get());
+        map1.stop();
+        map2.stop();
+    }
+   
+    public void testGetImplicitWriteLock() throws Exception {
+        Group map1 = new Group(connection1, "map1");
+        final AtomicBoolean called = new AtomicBoolean();
+        map1.start();
+        Group map2 = new Group(connection2, "map2");
+        map2.setAlwaysLock(true);
+        map2.setMinimumGroupSize(2);
+        map2.start();
+        map2.put("test", "foo");
+        try {
+            map1.put("test", "bah");
+            fail("Should have thrown an exception!");
+        } catch (GroupUpdateException e) {
+        }
+        map1.stop();
+        map2.stop();
+    }
+    
+    public void testExpireImplicitWriteLock() throws Exception {
+        Group map1 = new Group(connection1, "map1");
+        final AtomicBoolean called = new AtomicBoolean();
+        map1.start();
+        Group map2 = new Group(connection2, "map2");
+        map2.setAlwaysLock(true);
+        map2.setLockTimeToLive(1000);
+        map2.setMinimumGroupSize(2);
+        map2.start();
+        map2.put("test", "foo");
+        try {
+            map1.put("test", "bah");
+            fail("Should have thrown an exception!");
+        } catch (GroupUpdateException e) {
+        }
+        Thread.sleep(2000);
+        map1.put("test", "bah");
+        map1.stop();
+        map2.stop();
+    }
+    
+    public void XtestExpireImplicitLockOnExit() throws Exception {
+        Group map1 = new Group(connection1, "map1");
+        final AtomicBoolean called = new AtomicBoolean();
+        map1.start();
+        Group map2 = new Group(connection2, "map2");
+        map2.setAlwaysLock(true);
+        map2.setMinimumGroupSize(2);
+        map2.start();
+        map2.put("test", "foo");
+        try {
+            map1.put("test", "bah");
+            fail("Should have thrown an exception!");
+        } catch (GroupUpdateException e) {
+        }
+        map2.stop();
+        map1.put("test", "bah");
+        map1.stop();
+        
+    }
+    
+    public void testGetExplicitWriteLock() throws Exception {
+        Group map1 = new Group(connection1, "map1");
+        map1.setAlwaysLock(true);
+        final AtomicBoolean called = new AtomicBoolean();
+        map1.start();
+        Group map2 = new Group(connection2, "map2");
+        map2.setAlwaysLock(true);
+        map2.setMinimumGroupSize(2);
+        map2.start();
+        map2.put("test", "foo");
+        map2.lock("test");
+        try {
+            map1.put("test", "bah");
+            fail("Should have thrown an exception!");
+        } catch (GroupUpdateException e) {
+        }
+        map2.unlock("test");
+        map1.lock("test");
+        try {
+            map2.lock("test");
+            fail("Should have thrown an exception!");
+        } catch (GroupUpdateException e) {
+        }
+        map1.stop();
+        map2.stop();
+    }
+    
+    
+
+    /**
+     * Test method for {@link org.apache.activemq.group.Group#clear()}.
+     * 
+     * @throws Exception
+     */
+    public void testClear() throws Exception {
+        Group map1 = new Group(connection1,"map1");
+        final AtomicBoolean called = new AtomicBoolean();
+        map1.addMapChangedListener(new DefaultMapChangedListener() {
+            public void mapInsert(Member owner,Object Key, Object Value) {
+                synchronized(called) {
+                    called.set(true);
+                    called.notifyAll();
+                }
+            }
+            
+            public void mapRemove(Member owner, Object key, Object value,boolean expired) {        
+                synchronized(called) {
+                    called.set(true);
+                    called.notifyAll();
+                }
+            }
+        });
+        map1.start();
+        Group map2 = new Group(connection2,"map2");
+        map2.start();
+        map2.put("test","foo");
+        synchronized(called) {
+            if (!called.get()) {
+               called.wait(5000); 
+            }
+        }
+        assertTrue(called.get());
+        called.set(false);
+        assertTrue(map1.isEmpty()==false);
+        map2.clear();
+        synchronized(called) {
+            if (!called.get()) {
+               called.wait(5000); 
+            }
+        }
+        assertTrue(map1.isEmpty());
+        map1.stop();
+        map2.stop();
+    }
+
+    /**
+     * Test a new map is populated for existing values
+     */
+    public void testMapUpdatedOnStart() throws Exception {
+        Group map1 = new Group(connection1,"map1");
+        final AtomicBoolean called = new AtomicBoolean();
+        
+        map1.start();
+        map1.put("test", "foo");
+        Group map2 = new Group(connection2,"map2");
+        map2.addMapChangedListener(new DefaultMapChangedListener() {
+            public void mapInsert(Member owner,Object Key, Object Value) {
+                synchronized(called) {
+                    called.set(true);
+                    called.notifyAll();
+                }
+            }
+        });
+        map2.start();
+       
+        synchronized(called) {
+            if (!called.get()) {
+               called.wait(5000); 
+            }
+        }
+        assertTrue(called.get());
+        called.set(false);
+        assertTrue(map2.containsKey("test"));
+        assertTrue(map2.containsValue("foo"));
+        map1.stop();
+        map2.stop();
+    }
+   
+    public void testContainsKey() throws Exception {
+        Group map1 = new Group(connection1,"map1");
+        final AtomicBoolean called = new AtomicBoolean();
+        map1.addMapChangedListener(new DefaultMapChangedListener() {
+            public void mapInsert(Member owner,Object Key, Object Value) {
+                synchronized(called) {
+                    called.set(true);
+                    called.notifyAll();
+                }
+            }
+        });
+        map1.start();
+        Group map2 = new Group(connection2,"map2");
+        map2.start();
+        map2.put("test","foo");
+        synchronized(called) {
+            if (!called.get()) {
+               called.wait(5000); 
+            }
+        }
+        assertTrue(called.get());
+        called.set(false);
+        assertTrue(map1.containsKey("test"));
+        map1.stop();
+        map2.stop();
+    }
+
+
+    /**
+     * Test method for
+     * {@link org.apache.activemq.group.Group#containsValue(java.lang.Object)}.
+     * @throws Exception 
+     */
+    public void testContainsValue() throws Exception {
+        Group map1 = new Group(connection1,"map1");
+        final AtomicBoolean called = new AtomicBoolean();
+        map1.addMapChangedListener(new DefaultMapChangedListener() {
+            public void mapInsert(Member owner,Object Key, Object Value) {
+                synchronized(called) {
+                    called.set(true);
+                    called.notifyAll();
+                }
+            }
+        });
+        map1.start();
+        Group map2 = new Group(connection2,"map2");
+        map2.start();
+        map2.put("test","foo");
+        synchronized(called) {
+            if (!called.get()) {
+               called.wait(5000); 
+            }
+        }
+        assertTrue(called.get());
+        called.set(false);
+        assertTrue(map1.containsValue("foo"));
+        map1.stop();
+        map2.stop();
+    }
+
+    /**
+     * Test method for {@link org.apache.activemq.group.GroupMap#entrySet()}.
+     * @throws Exception 
+     */
+    
+
+    /**
+     * Test method for
+     * {@link org.apache.activemq.group.Group#get(java.lang.Object)}.
+     * @throws Exception 
+     */
+    public void testGet() throws Exception {
+        Group map1 = new Group(connection1,"map1");
+        final AtomicBoolean called = new AtomicBoolean();
+        map1.addMapChangedListener(new DefaultMapChangedListener() {
+            public void mapInsert(Member owner,Object Key, Object Value) {
+                synchronized(called) {
+                    called.set(true);
+                    called.notifyAll();
+                }
+            }
+        });
+        map1.start();
+        Group map2 = new Group(connection2,"map2");
+        map2.start();
+        map2.put("test","foo");
+        synchronized(called) {
+            if (!called.get()) {
+               called.wait(5000); 
+            }
+        }
+        assertTrue(called.get());
+        assertTrue(map1.get("test").equals("foo"));
+        map1.stop();
+        map2.stop();
+    }
+    
+    public void testPut() throws Exception {
+        Group map1 = new Group(connection1,"map1");
+        map1.start();
+        Group map2 = new Group(connection2,"map2");
+        map2.setMinimumGroupSize(2);
+        map2.start();
+        Object value = map1.put("foo", "blob");
+        assertNull(value);
+        value = map1.put("foo", "blah");
+        assertEquals(value, "blob");
+        map1.stop();
+        map2.stop();
+    }
+
+    
+    
+    /**
+     * Test method for
+     * {@link org.apache.activemq.group.Group#remove(java.lang.Object)}.
+     */
+    public void testRemove() throws Exception{
+        Group map1 = new Group(connection1,"map1");
+        final AtomicBoolean called = new AtomicBoolean();
+        map1.addMapChangedListener(new DefaultMapChangedListener() {
+            public void mapInsert(Member owner,Object Key, Object Value) {
+                synchronized(called) {
+                    called.set(true);
+                    called.notifyAll();
+                }
+            }
+            
+            public void mapRemove(Member owner, Object key, Object value,boolean expired) {        
+                synchronized(called) {
+                    called.set(true);
+                    called.notifyAll();
+                }
+            }
+        });
+        map1.start();
+        Group map2 = new Group(connection2,"map2");
+        map2.start();
+        map2.put("test","foo");
+        synchronized(called) {
+            if (!called.get()) {
+               called.wait(5000); 
+            }
+        }
+        assertTrue(called.get());
+        called.set(false);
+        assertTrue(map1.isEmpty()==false);
+        map2.remove("test");
+        synchronized(called) {
+            if (!called.get()) {
+               called.wait(5000); 
+            }
+        }
+        assertTrue(map1.isEmpty());
+        
+        map1.stop();
+        map2.stop();
+    }
+    
+    public void testExpire() throws Exception{
+        final AtomicBoolean called1 = new AtomicBoolean();
+        final AtomicBoolean called2 = new AtomicBoolean();
+        
+        Group map1 = new Group(connection1,"map1");
+        map1.setTimeToLive(1000);
+        map1.addMapChangedListener(new DefaultMapChangedListener() {
+            public void mapRemove(Member owner, Object key, Object value,boolean expired) {        
+                synchronized(called1) {
+                    called1.set(expired);
+                    called1.notifyAll();
+                }
+            }
+        });
+        map1.start();
+        
+        Group map2 = new Group(connection2,"map2");
+        
+        map2.addMapChangedListener(new DefaultMapChangedListener() {
+            public void mapRemove(Member owner, Object key, Object value,boolean expired) {        
+                synchronized(called2) {
+                    called2.set(expired);
+                    called2.notifyAll();
+                }
+            }
+        });
+        map2.start();
+        
+        
+        map1.put("test", "blob");
+        synchronized(called1) {
+            if (!called1.get()) {
+               called1.wait(5000); 
+            }
+        }
+        synchronized(called2) {
+            if (!called2.get()) {
+               called2.wait(5000); 
+            }
+        }
+        assertTrue(called1.get());
+        assertTrue(called2.get());
+        map1.stop();
+        map2.stop();
+    }
+
+    protected void setUp() throws Exception {
+        if (broker == null) {
+            broker = createBroker();
+        }
+        ConnectionFactory factory = createConnectionFactory();
+        connection1 = factory.createConnection();
+        connection1.start();
+        connection2 = factory.createConnection();
+        connection2.start();
+        super.setUp();
+    }
+
+    protected void tearDown() throws Exception {
+        super.tearDown();
+        connection1.close();
+        connection2.close();
+        if (broker != null) {
+            broker.stop();
+        }
+    }
+
+    protected ActiveMQConnectionFactory createConnectionFactory()throws Exception {
+        ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(
+                ActiveMQConnection.DEFAULT_BROKER_URL);
+        return cf;
+    }
+
+    protected BrokerService createBroker() throws Exception {
+        BrokerService answer = new BrokerService();
+        configureBroker(answer);
+        answer.start();
+        return answer;
+    }
+
+    protected void configureBroker(BrokerService answer) throws Exception {
+        answer.setPersistent(false);
+        answer.addConnector(bindAddress);
+        answer.setDeleteAllMessagesOnStartup(true);
+    }
+}

Propchange: activemq/trunk/activemq-groups/src/test/java/org/apache/activegroups/GroupStateTest.java
------------------------------------------------------------------------------
    svn:eol-style = native



Mime
View raw message