Return-Path: Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: (qmail 45815 invoked from network); 21 Nov 2008 20:45:20 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 21 Nov 2008 20:45:20 -0000 Received: (qmail 74682 invoked by uid 500); 21 Nov 2008 20:45:29 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 74668 invoked by uid 500); 21 Nov 2008 20:45:29 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 74659 invoked by uid 99); 21 Nov 2008 20:45:29 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 21 Nov 2008 12:45:29 -0800 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 21 Nov 2008 20:44:01 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id D830C23889BB; Fri, 21 Nov 2008 12:44:46 -0800 (PST) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r719706 [4/6] - in /activemq/activemq-blaze: ./ branches/ tags/ trunk/ trunk/src/ trunk/src/main/ trunk/src/main/java/ trunk/src/main/java/org/ trunk/src/main/java/org/apache/ trunk/src/main/java/org/apache/activeblaze/ trunk/src/main/java/... Date: Fri, 21 Nov 2008 20:44:43 -0000 To: commits@activemq.apache.org From: rajdavies@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20081121204446.D830C23889BB@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/Group.java URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/Group.java?rev=719706&view=auto ============================================================================== --- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/Group.java (added) +++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/Group.java Fri Nov 21 12:44:40 2008 @@ -0,0 +1,402 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activeblaze.group; + +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import org.apache.activeblaze.BaseService; +import org.apache.activeblaze.wire.DestinationData; +import org.apache.activeblaze.wire.MemberData; +import org.apache.activeblaze.wire.MessageType; +import org.apache.activemq.protobuf.Buffer; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +/** + * Maintains members of a group + * + */ +public class Group extends BaseService { + static final Log LOG = LogFactory.getLog(Group.class); + final BlazeGroupChannelImpl channel; + private final BlazeGroupConfiguration configuration; + private ScheduledExecutorService heartBeatService; + private ScheduledExecutorService checkMembershipService; + protected Map members = new ConcurrentHashMap(); + private List membershipListeners = new CopyOnWriteArrayList(); + private final Map> queueMap = new ConcurrentHashMap>(); + private final Map> topicMap = new ConcurrentHashMap>(); + + /** + * Constructor + * + * @param local + * @param channel + * @param transport + * @param config + */ + protected Group(BlazeGroupChannelImpl channel) { + this.channel = channel; + this.configuration = channel.getGroupConfiguration(); + } + + /** + * @return the Member of the Channel + * @throws Exception + */ + public MemberImpl getLocalMember() throws Exception { + return this.channel.getLocalMember(); + } + + void updateLocal(MemberImpl member) { + this.members.put(member.getId(), member); + } + + /** + * @return the id of the local channel associated with this group + */ + public String getId() { + return this.channel.getId(); + } + + /** + * @return the name of the local channel + */ + public String getName() { + return this.channel.getName(); + } + + /** + * @return the configuration associated with this channel + */ + public BlazeGroupConfiguration getConfiguration() { + return this.configuration; + } + + void addMemberChangedListener(MemberChangedListener l) { + this.membershipListeners.add(l); + } + + void removeMemberChangedListener(MemberChangedListener l) { + this.membershipListeners.add(l); + } + + /** + * @return the members + */ + Set getMembers() { + return new HashSet(this.members.values()); + } + + /** + * @return the members from this group + */ + public Set getMembersImpl() { + return new HashSet(this.members.values()); + } + + /** + * Get a member by its unique id + * + * @param id + * @return + */ + Member getMemberById(String id) { + return this.members.get(id); + } + + /** + * Return a member of the Group with the matching name + * + * @param name + * @return + */ + Member getMemberByName(String name) { + if (name != null) { + for (Member member : this.members.values()) { + if (member.getName().equals(name)) { + return member; + } + } + } + return null; + } + + /** + * Will wait for a member to advertise itself if not available + * @param name + * @param timeout + * @return the member or null + * @throws InterruptedException + */ + public Member getAndWaitForMemberByName(String name, int timeout) throws InterruptedException { + Member result = null; + long deadline = 0; + if (timeout > 0) { + deadline = System.currentTimeMillis() + timeout; + } + while (true) { + result = getMemberByName(name); + if (result == null) { + synchronized(this.members) { + this.members.wait(timeout); + } + if (timeout > 0) { + timeout = (int) Math.max(deadline - System.currentTimeMillis(), 0l); + } + }else { + break; + } + } + return result; + } + + /** + * @return + * @throws Exception + * @see org.apache.activeblaze.Service#init() + */ + public boolean init() throws Exception { + boolean result = super.init(); + if (result) { + this.members.put(this.channel.getId(), this.channel.getLocalMember()); + } + return result; + } + + /** + * @return + * @throws Exception + * @see org.apache.activeblaze.Service#shutDown() + */ + public boolean shutDown() throws Exception { + boolean result = super.shutDown(); + if (result) { + } + return result; + } + + /** + * @return + * @throws Exception + * @see org.apache.activeblaze.Service#start() + */ + public boolean start() throws Exception { + boolean result = super.start(); + if (result) { + this.heartBeatService = Executors.newScheduledThreadPool(1, new ThreadFactory() { + public Thread newThread(Runnable r) { + Thread thread = new Thread(r); + thread.setDaemon(true); + return thread; + } + }); + Runnable heartbeat = new Runnable() { + public void run() { + try { + broadcastHeartBeat(getLocalMember()); + } catch (Exception e) { + LOG.error("Failed to send heartbeat",e); + } + } + }; + heartbeat.run(); + int interval = this.configuration.getHeartBeatInterval(); + this.heartBeatService.scheduleAtFixedRate(heartbeat, interval, interval, TimeUnit.MILLISECONDS); + this.checkMembershipService = Executors.newScheduledThreadPool(1, new ThreadFactory() { + public Thread newThread(Runnable r) { + Thread thread = new Thread(r); + thread.setDaemon(true); + return thread; + } + }); + Runnable checkMembership = new Runnable() { + public void run() { + if (isStarted()) { + try { + checkMembership(); + } catch (Exception e) { + LOG.error("Failed to checkMembership", e); + } + } + } + }; + this.checkMembershipService.scheduleAtFixedRate(checkMembership, interval / 3, interval / 2, + TimeUnit.MILLISECONDS); + } + return result; + } + + /** + * @return + * @throws Exception + * @see org.apache.activeblaze.Service#stop() + */ + public boolean stop() throws Exception { + boolean result = super.stop(); + if (result) { + if (this.heartBeatService != null) { + this.heartBeatService.shutdown(); + } + if (this.checkMembershipService != null) { + this.checkMembershipService.shutdown(); + } + } + return result; + } + + /** + * Process a new member + * + * @param data + * @throws Exception + * @return Member if a new member else null + */ + protected MemberImpl processMember(MemberData data) throws Exception { + MemberImpl result = null; + MemberImpl old = null; + MemberImpl member = new MemberImpl(data); + if (!member.getId().equals(getLocalMember().getId())) { + member.setTimeStamp(System.currentTimeMillis()); + if ((old = this.members.put(member.getId(), member)) == null) { + processMemberStarted(member); + if (!member.getId().equals(this.channel.getId())) { + this.channel.sendMessage(member.getAddress(), MessageType.MEMBER_DATA, this.channel + .getLocalMember().getData()); + } + result = member; + } else { + if (data.getDestinationsChanged()) { + processMemberUpdate(old, member); + } + } + } + return result; + } + + private void fireMemberStarted(Member member) { + LOG.debug(this.channel.getName() + " Member started " + member); + for (MemberChangedListener l : this.membershipListeners) { + l.memberStarted(member); + } + } + + private void fireMemberStopped(Member member) { + LOG.debug(this.channel.getName() + " Member stopped " + member); + for (MemberChangedListener l : this.membershipListeners) { + l.memberStopped(member); + } + } + + void checkMembership() throws Exception { + if (isStarted()) { + long checkTime = System.currentTimeMillis() - this.configuration.getHeartBeatInterval(); + for (MemberImpl member : this.members.values()) { + if (!member.getId().equals(getId()) && member.getTimeStamp() < checkTime) { + LOG.debug(getId() +" Member timestamp expired " + member); + this.members.remove(member.getId()); + processMemberStopped(member); + } + } + } + } + + protected void processMemberStarted(MemberImpl member) throws Exception { + processDestinationsForStarted(member); + fireMemberStarted(member); + synchronized(this.members) { + this.members.notifyAll(); + } + } + + protected void processMemberStopped(MemberImpl member) throws Exception { + fireMemberStopped(member); + processDestinationsForStopped(member); + } + + private void processDestinationsForStarted(MemberImpl member) { + List destList = member.getData().getDestinationList(); + for (DestinationData dest : destList) { + Buffer key = dest.getDestination(); + Map> map = null; + if (dest.getTopic()) { + map = this.topicMap; + } else { + map = this.queueMap; + } + List members = map.get(key); + if (members == null) { + members = new CopyOnWriteArrayList(); + map.put(key, members); + } + members.add(member); + } + } + + private void processDestinationsForStopped(MemberImpl member) { + List destList = member.getData().getDestinationList(); + for (DestinationData dest : destList) { + Buffer key = dest.getDestination(); + Map> map = null; + if (dest.getTopic()) { + map = this.topicMap; + } else { + map = this.queueMap; + } + List members = map.get(key); + if (members != null) { + members.remove(member); + if (members.isEmpty()) { + map.remove(key); + } + } + } + } + + protected void processMemberUpdate(MemberImpl oldMember, MemberImpl newMember) throws Exception { + processDestinationsForStopped(oldMember); + processDestinationsForStarted(newMember); + } + + /** + * @return the queueMap + */ + protected Map> getQueueMap() { + return this.queueMap; + } + + /** + * @return the topicMap + */ + protected Map> getTopicMap() { + return this.topicMap; + } + + protected void broadcastHeartBeat(MemberImpl local) throws Exception { + if (isStarted()) { + Group.this.channel.broadcastMessage(MessageType.MEMBER_DATA, local.getData()); + } + } +} Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/Group.java ------------------------------------------------------------------------------ svn:eol-style = native Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/Member.java URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/Member.java?rev=719706&view=auto ============================================================================== --- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/Member.java (added) +++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/Member.java Fri Nov 21 12:44:40 2008 @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activeblaze.group; + + +/** + *A Member holds information about a member of the group + * + */ +public interface Member { + + /** + * @return the name + */ + public String getName(); + + /** + * @return the id + */ + public String getId(); + + + /** + * @return the startTime + */ + public long getStartTime(); + + /** + * @return the timeStamp + */ + long getTimeStamp(); + + /** + * Set the timestamp + * @param value + */ + void setTimeStamp(long value); + /** + * @return the inbox destination + */ + public String getInBoxDestination(); + + + /** + * @return the coordinatorWeight + */ + public long getCoordinatorWeight(); + +} Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/Member.java ------------------------------------------------------------------------------ svn:eol-style = native Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/MemberChangedListener.java URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/MemberChangedListener.java?rev=719706&view=auto ============================================================================== --- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/MemberChangedListener.java (added) +++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/MemberChangedListener.java Fri Nov 21 12:44:40 2008 @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activeblaze.group; + + +/** + * A listener for membership changes to a group + * + */ +public interface MemberChangedListener { + + /** + * Notification a member has started + * @param member + */ + void memberStarted(Member member); + + /** + * Notification a member has stopped + * @param member + */ + void memberStopped(Member member); +} \ No newline at end of file Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/MemberChangedListener.java ------------------------------------------------------------------------------ svn:eol-style = native Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/MemberImpl.java URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/MemberImpl.java?rev=719706&view=auto ============================================================================== --- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/MemberImpl.java (added) +++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/MemberImpl.java Fri Nov 21 12:44:40 2008 @@ -0,0 +1,166 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activeblaze.group; + +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.URI; +import org.apache.activeblaze.wire.MemberData; +import org.apache.activemq.protobuf.Buffer; + +/** + * Implementation of a Member + * + */ +public class MemberImpl implements Member { + private final MemberData data; + private final InetSocketAddress socketAddress; + private final Buffer socketAddressAsBuffer; + + + /** + * Default constructor + * @param id + * @param name + * @param coordinatorWeight + * @param localURI + * @throws Exception + */ + public MemberImpl(String id,String name,long coordinatorWeight,URI localURI) throws Exception { + InetAddress addr = InetAddress.getByName(localURI.getHost()); + this.socketAddress = new InetSocketAddress(addr,localURI.getPort()); + this.socketAddressAsBuffer=new Buffer(this.socketAddress.toString()); + this.data = new MemberData(); + this.data.setId(id); + this.data.setName(name); + this.data.setCoordinatorWeight(coordinatorWeight); + this.data.setStartTime(System.currentTimeMillis()); + this.data.setInetAddress(new Buffer(addr.getHostAddress())); + this.data.setPort(localURI.getPort()); + + + } + /** + * Constructor + * @param data + * @throws Exception + */ + public MemberImpl(MemberData data) throws Exception { + this.data = data; + InetAddress addr = InetAddress.getByName(data.getInetAddress().toStringUtf8()); + this.socketAddress= new InetSocketAddress(addr,data.getPort()); + this.socketAddressAsBuffer=new Buffer(this.socketAddress.toString()); + } + + /** + * @return the name + */ + public String getName() { + return this.data.getName(); + } + + /** + * @return the id + */ + public String getId() { + return this.data.getId(); + } + + void setId(String id) { + this.data.setId(id); + } + + + /** + * @return the startTime + */ + public long getStartTime() { + return this.data.getStartTime(); + } + + + /** + * @return the inbox destination + */ + public String getInBoxDestination() { + return this.data.getId(); + } + + /** + * @return the SocketAddress for this member + */ + public InetSocketAddress getAddress () { + return this.socketAddress; + + } + + /** + * @return address as a Buffer + */ + public Buffer getAddressAsBuffer() { + return this.socketAddressAsBuffer; + } + + /** + * @return the timeStamp + */ + public long getTimeStamp() { + return this.data.getTimeStamp(); + } + + /** + * Set the timestamp + * @param value + */ + public void setTimeStamp(long value) { + this.data.setTimeStamp(value); + } + + + /** + * @return the coordinatorWeight + */ + public long getCoordinatorWeight() { + return this.data.getCoordinatorWeight(); + } + + + public String toString() { + return this.data.getName()+"["+this.data.getId()+"]"; + } + + + public int hashCode() { + return this.data.getId().hashCode(); + } + + public boolean equals(Object obj) { + boolean result = false; + if (obj instanceof MemberImpl) { + MemberImpl other = (MemberImpl)obj; + result = this.data.getId().equals(other.data.getId()); + } + return result; + } + + /** + * @return the data + */ + public MemberData getData() { + return this.data; + } +} Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/MemberImpl.java ------------------------------------------------------------------------------ svn:eol-style = native Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/RequestCallback.java URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/RequestCallback.java?rev=719706&view=auto ============================================================================== --- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/RequestCallback.java (added) +++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/RequestCallback.java Fri Nov 21 12:44:40 2008 @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activeblaze.group; + +import org.apache.activemq.protobuf.Buffer; + +/** + * request callback + * + */ +public interface RequestCallback { + /** + * Optionally called when a request is finished + * @param id + */ + void finished(Buffer id); +} Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/RequestCallback.java ------------------------------------------------------------------------------ svn:eol-style = native Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/SendRequest.java URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/SendRequest.java?rev=719706&view=auto ============================================================================== --- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/SendRequest.java (added) +++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/SendRequest.java Fri Nov 21 12:44:40 2008 @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activeblaze.group; + +import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.activemq.protobuf.Buffer; +import org.apache.activemq.protobuf.Message; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +/** + * @author state on a request + * + */ +class SendRequest { + private static final Log LOG = LogFactory.getLog(SendRequest.class); + private final AtomicBoolean done = new AtomicBoolean(); + private Message response; + private RequestCallback callback; + + Object get(long timeout) { + synchronized (this.done) { + if (this.done.get() == false && this.response == null) { + try { + this.done.wait(timeout); + } catch (InterruptedException e) { + LOG.warn("Interrupted in get("+timeout+")",e); + } + } + } + return this.response; + } + + void put(Buffer id,Message response) { + this.response = response; + cancel(); + RequestCallback callback = this.callback; + if (callback != null) { + callback.finished(id); + } + } + + void cancel() { + this.done.set(true); + synchronized (this.done) { + this.done.notifyAll(); + } + } + + void setCallback(RequestCallback callback) { + this.callback=callback; + } +} Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/SendRequest.java ------------------------------------------------------------------------------ svn:eol-style = native Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/package.html URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/package.html?rev=719706&view=auto ============================================================================== --- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/package.html (added) +++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/package.html Fri Nov 21 12:44:40 2008 @@ -0,0 +1,26 @@ + + + + + + +Group channel for communicating true point-to-point using unicast and +Group membership + + + \ No newline at end of file Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/destination/DestinationMatch.java URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/destination/DestinationMatch.java?rev=719706&view=auto ============================================================================== --- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/destination/DestinationMatch.java (added) +++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/destination/DestinationMatch.java Fri Nov 21 12:44:40 2008 @@ -0,0 +1,201 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activeblaze.impl.destination; + +import org.apache.activemq.protobuf.Buffer; + +/** + * Matches a Destination Subject to wildcards + * + */ +public final class DestinationMatch { + static final byte MATCH_ELEMENT = '*'; + static final byte MATCH_ALL = '>'; + static final byte[] DELIMETERS = { '.', '/', '|' }; + + /** + * See if the destination matches with wild cards + * + * @param destination + * @param match + * @return true if its a match + */ + public static boolean isMatch(String destination, String match) { + return isMatch(new Buffer(destination), new Buffer(match)); + } + /** + * See if the destination matches with wild cards + * + * @param destination + * @param match + * @return true if its a match + */ + public static boolean isMatch(Buffer destination, String match) { + return isMatch(destination, new Buffer(match)); + } + + /** + * See if the destination matches with wild cards + * + * @param destination + * @param match + * @return true if its a match + */ + public static boolean isMatch(String destination, Buffer match) { + return isMatch(new Buffer(destination), match); + } + + /** + * See if the destination matches with wild cards + * + * @param destination + * @param match + * @return + */ + public static boolean isMatch(Buffer destination, Buffer match) { + boolean result = true; + boolean matchAll = false; + if (destination == null && match == null) { + return true; + } + if (destination == null || match == null) { + return false; + } + int destinationOffset = 0; + int matchOffset = 0; + while (destinationOffset < destination.length + && matchOffset < match.length) { + byte matchByte = match.byteAt(matchOffset); + byte destinationByte = destination.byteAt(destinationOffset); + if (matchByte != destinationByte || matchByte == MATCH_ALL || destinationByte == MATCH_ALL + || matchByte == MATCH_ELEMENT || destinationByte == MATCH_ELEMENT) { + if (matchByte == MATCH_ALL || destinationByte == MATCH_ALL) { + matchAll = true; + break; + } else if ((matchByte == MATCH_ELEMENT || destinationByte == MATCH_ELEMENT) + && (isMatchElement(match, matchOffset, match.length) || isMatchElement(destination, + destinationOffset, destination.length))) { + if (containsDelimeter(destination, destinationOffset, destination.length) == false + && containsDelimeter(match, matchOffset, match.length) == false) { + break; + } else { + matchOffset = offsetToNextToken(match, matchOffset, match.length); + destinationOffset = offsetToNextToken(destination, destinationOffset, destination.length); + continue; + } + } else { + result = false; + break; + } + } + matchOffset++; + destinationOffset++; + } + if (result + && (match.length != destination.length && matchOffset != match.length || destinationOffset != destination.length) + && !matchAll) { + result = isMatchAll(match, matchOffset, match.length) + || isMatchAll(destination, destinationOffset, destination.length); + } + return result; + } + + static boolean isMatchAll(Buffer str, int offset, int count) { + boolean result = false; + if (str != null && offset < str.length) { + byte offByte = str.byteAt(offset); + byte offBytePlusOne = (byte) ((offset + 1) < str.length ? str.byteAt(offset + 1) : ' '); + if (offset + 1 < str.length && isDelimiter(offByte)) { + if (offBytePlusOne == MATCH_ALL) { + result = true; + } + } else if ((offByte == MATCH_ALL || offByte == MATCH_ELEMENT) + && (isWhiteSpace(str, offset + 1, count) || isDelimiter(offBytePlusOne) || offset + 1 == str.length)) { + result = true; + } + } + return result; + } + + static boolean isMatchElement(Buffer str, int offset, int count) { + boolean result = false; + if (str.byteAt(offset) == MATCH_ELEMENT) { + if (offset == 0 || isDelimiter(str.byteAt(offset - 1))) { + result = ((offset + 1) >= str.length) || isDelimiter(str.byteAt(offset + 1)) + || isWhiteSpace(str, offset + 1, count); + } + } + return result; + } + + private static boolean isWhiteSpace(Buffer str, int offset, int len) { + boolean result = true; + while ((offset < len)) { + if (str.byteAt(offset++) > ' ') { + result = false; + break; + } + } + return result; + } + + static int offsetToNextToken(Buffer str, int offset, int len) { + while (offset < len) { + if (isDelimiter(str.byteAt(offset))) + break; + offset++; + } + return offset; + } + + static int offsetToNextElement(Buffer str, int offset, int len) { + int result = -1; + int count = offset; + while (count < len) { + if (isDelimiter(str.byteAt(count))) { + result = ++count; + // check for double placed elements ... + while (count < len && isDelimiter(str.byteAt(count))) { + result = ++count; + } + break; + } + count++; + } + return result; + } + + private static boolean containsDelimeter(Buffer str, int offset, int len) { + boolean result = false; + while (offset < len) { + if (isDelimiter(str.byteAt(offset++))) { + result = true; + break; + } + } + return result; + } + + private static final boolean isDelimiter(byte b) { + for (int i = 0; i < DELIMETERS.length; i++) { + if (b == DELIMETERS[i]) { + return true; + } + } + return false; + } +} Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/destination/DestinationMatch.java ------------------------------------------------------------------------------ svn:eol-style = native Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/destination/package.html URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/destination/package.html?rev=719706&view=auto ============================================================================== --- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/destination/package.html (added) +++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/destination/package.html Fri Nov 21 12:44:40 2008 @@ -0,0 +1,25 @@ +!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> + + + + + +destination utility classes + + + \ No newline at end of file Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/ChainedProcessor.java URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/ChainedProcessor.java?rev=719706&view=auto ============================================================================== --- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/ChainedProcessor.java (added) +++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/ChainedProcessor.java Fri Nov 21 12:44:40 2008 @@ -0,0 +1,181 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activeblaze.impl.processor; + +import org.apache.activeblaze.BaseService; +import org.apache.activeblaze.BlazeException; +import org.apache.activeblaze.ExceptionListener; +import org.apache.activeblaze.Processor; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +/** + * Chains Processors together + * + */ +public class ChainedProcessor extends BaseService implements Processor { + private static final Log LOG = LogFactory.getLog(ChainedProcessor.class); + private Processor next; + private Processor prev; + protected ExceptionListener exceptionListener; + + protected ChainedProcessor() { + } + + /** + * @return the next + */ + public Processor getNext() { + return this.next; + } + + /** + * Set Next at the end of the chain + * @param next + * + */ + public void setEnd(Processor next) { + ChainedProcessor target = this; + Processor n = getNext(); + while (n != null) { + if (n instanceof ChainedProcessor) { + ChainedProcessor cn = (ChainedProcessor) n; + target = cn; + n = cn.getNext(); + } + } + if(next instanceof ChainedProcessor) { + target.setNextChain((ChainedProcessor) next); + }else { + target.next=next; + } + } + + /** + * Set the next + * @param next + */ + public void setNext(Processor next) { + this.next=next; + } + + /** + * @return the prev + */ + public Processor getPrev() { + return this.prev; + } + + /** + * Set the next chain + * + * @param p + */ + public void setNextChain(ChainedProcessor p) { + ChainedProcessor target = this; + Processor n = getNext(); + while (n != null) { + if (n instanceof ChainedProcessor) { + ChainedProcessor cn = (ChainedProcessor) n; + target = cn; + n = cn.getNext(); + } + } + target.next=p; + p.setPrev(target); + if (this.exceptionListener != null && p.exceptionListener == null) { + p.exceptionListener = this.exceptionListener; + } + } + + /** + * @param prev + * the prev to set + */ + public void setPrev(Processor prev) { + this.prev = prev; + } + + public boolean init() throws Exception { + boolean result = super.init(); + if (result && this.next != null) { + result = this.next.init(); + } + return result; + } + + public boolean shutDown() throws Exception { + boolean result = super.shutDown(); + if (result && this.next != null) { + result = this.next.shutDown(); + } + return result; + } + + public boolean start() throws Exception { + boolean result = super.start(); + if (result && this.next != null) { + result = this.next.start(); + } + return result; + } + + public boolean stop() throws Exception { + boolean result = super.stop(); + if (result && this.next != null) { + result = this.next.stop(); + } + return result; + } + + public void downStream(Packet packet) throws Exception { + if (this.next != null) { + this.next.downStream(packet); + } + } + + public void upStream(Packet packet) throws Exception { + if (this.prev != null) { + this.prev.upStream(packet); + } + } + + public void setExceptionListener(ExceptionListener l) { + this.exceptionListener = l; + } + + protected void fireException(String reason, Exception e) { + doFireException(new BlazeException(reason, e)); + } + + protected void fireException(Exception e) { + doFireException(new BlazeException(e)); + } + + protected void fireException(String reason) { + doFireException(new BlazeException(reason)); + } + + protected void doFireException(Exception e) { + ExceptionListener l = this.exceptionListener; + if (l != null) { + l.onException(e); + } else { + LOG.error("No exception listener - caught exception ", e); + } + } +} Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/ChainedProcessor.java ------------------------------------------------------------------------------ svn:eol-style = native Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/CompressionProcessor.java URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/CompressionProcessor.java?rev=719706&view=auto ============================================================================== --- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/CompressionProcessor.java (added) +++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/CompressionProcessor.java Fri Nov 21 12:44:40 2008 @@ -0,0 +1,132 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activeblaze.impl.processor; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.zip.Deflater; +import java.util.zip.GZIPInputStream; +import java.util.zip.GZIPOutputStream; +import org.apache.activemq.protobuf.Buffer; + +/** + * Compresses PacketData + * + */ +public class CompressionProcessor extends ChainedProcessor { + private int compressionLimit = 8192; + private int compressionLevel = Deflater.BEST_COMPRESSION; + private class CompressionStream extends GZIPOutputStream { + CompressionStream(OutputStream out, int size) throws IOException { + super(out, size); + this.def.setLevel(getCompressionLevel()); + } + } + + /** + * @return the limit beyond which data will be compresses + */ + public int getCompressionLimit() { + return this.compressionLimit; + } + + /** + * @param compressionLimit - + * set the compressionLimit + */ + public void setCompressionLimit(int compressionLimit) { + this.compressionLimit = compressionLimit; + } + + public void downStream(Packet packet) throws Exception { + Buffer data = packet.getPacketData().getPayload(); + if (data != null && data.length >= this.compressionLimit) { + ByteArrayOutputStream bytesOut = new ByteArrayOutputStream(data.length); + try { + GZIPOutputStream gzipOut = new CompressionStream(bytesOut, data.length); + gzipOut.write(data.toByteArray()); + gzipOut.close(); + bytesOut.close(); + byte[] result = bytesOut.toByteArray(); + packet.getPacketData().clearPayload(); + // need to clone to get sizing correct + packet = packet.clone(); + packet.getPacketData().setPayload(new Buffer(result)); + } catch (IOException e) { + fireException("Failed to deflate packet", e); + } + } + super.downStream(packet); + } + + public void upStream(Packet packet) throws Exception { + Buffer data = packet.getPacketData().getPayload(); + ; + if (CompressionProcessor.isCompressed(data)) { + InputStream bytesIn = data.newInput(); + try { + GZIPInputStream gzipIn = new GZIPInputStream(bytesIn); + ByteArrayOutputStream bytesOut = new ByteArrayOutputStream(); + byte[] buffer = new byte[4096]; + int bytesRead = 0; + while ((bytesRead = gzipIn.read(buffer, 0, buffer.length)) > 0) { + bytesOut.write(buffer, 0, bytesRead); + } + gzipIn.close(); + bytesIn.close(); + byte[] result = bytesOut.toByteArray(); + bytesOut.close(); + packet.getPacketData().clearPayload(); + // need to clone to get sizing correct + packet = packet.clone(); + packet.getPacketData().setPayload(new Buffer(result)); + } catch (IOException e) { + fireException("Failed to inflate packet", e); + } + } + super.upStream(packet); + } + + static boolean isCompressed(Buffer data) { + boolean result = false; + if (data != null && data.length > 2) { + int ch1 = (int) (data.byteAt(data.offset) & 0xff); + int ch2 = (int) (data.byteAt(data.offset + 1) & 0xff); + int magic = (ch1 | (ch2 << 8)); + result = (magic == GZIPInputStream.GZIP_MAGIC); + } + return result; + } + + /** + * @return the compressionLevel + */ + public int getCompressionLevel() { + return this.compressionLevel; + } + + /** + * @param compressionLevel + * the compressionLevel to set These are the values past to the Deflater - 1 for best speed, 9 for best + * compression (the default) + */ + public void setCompressionLevel(int compressionLevel) { + this.compressionLevel = compressionLevel; + } +} Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/CompressionProcessor.java ------------------------------------------------------------------------------ svn:eol-style = native Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/FragmentationProcessor.java URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/FragmentationProcessor.java?rev=719706&view=auto ============================================================================== --- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/FragmentationProcessor.java (added) +++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/FragmentationProcessor.java Fri Nov 21 12:44:40 2008 @@ -0,0 +1,148 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activeblaze.impl.processor; + +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import org.apache.activeblaze.BlazeConfiguration; +import org.apache.activemq.protobuf.Buffer; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +/** + * Fragments a packet + */ + +public class FragmentationProcessor extends ChainedProcessor { + private static final Log LOG = LogFactory.getLog(FragmentationProcessor.class); + private int maxPacketSize = BlazeConfiguration.DEFAULT_MAX_PACKET_SIZE; + private int maxCacheSize = 16 * 1024; + private Map> cache = new LinkedHashMap>() { + protected boolean removeEldestEntry(Map.Entry> eldest) { + return removeEldestCacheEntry(eldest.getKey()); + } + }; + + + public void downStream(Packet packet) throws Exception { + int size = packet.getPacketData().serializedSizeUnframed(); + if (size > this.getMaxPacketSize()) { + Buffer payload = packet.getPacketData().getPayload(); + packet.getPacketData().clearPayload(); + packet=packet.clone(); + int headerSize = packet.getPacketData().serializedSizeUnframed(); + int fragmentSize = getMaxPacketSize()-headerSize; + int length = payload.length; + + int offset=payload.offset; + int numberOfParts = length/fragmentSize; + if (length%fragmentSize!=0) { + numberOfParts++; + } + int partNumber = 0; + while (offset < length) { + Buffer nextPayload = new Buffer(payload.data,offset,fragmentSize); + offset += fragmentSize; + Packet next = packet.clone(); + next.getPacketData().setPayload(nextPayload); + next.getPacketData().setNumberOfParts(numberOfParts); + next.getPacketData().setPartNumber(partNumber); + partNumber++; + super.downStream(next); + + } + }else { + super.downStream(packet); + } + } + + public void upStream(Packet packet) throws Exception { + if (packet.getPacketData().getNumberOfParts() > 1) { + synchronized(this.cache) { + List value = this.cache.get(packet.getId()); + if (value == null) { + value = new ArrayList(packet.getPacketData().getNumberOfParts()); + this.cache.put(packet.getId(), value); + } + value.add(packet.getPacketData().getPartNumber(),packet); + if (value.size()==packet.getPacketData().getNumberOfParts()) { + Packet result = packet.clone(); + result.getPacketData().clearPayload(); + result.getPacketData().clearNumberOfParts(); + result.getPacketData().clearPartNumber(); + result.getPacketData().setNumberOfParts(1); + result.getPacketData().setPartNumber(0); + int size=0; + for (Packet p:value) { + size+= p.getPacketData().getPayload().length; + } + byte[] data = new byte[size]; + int offset=0; + for (Packet p:value) { + Buffer src = p.getPacketData().getPayload(); + System.arraycopy(src.data, src.offset, data, offset, src.length); + offset+=src.length; + } + result.getPacketData().setPayload(new Buffer(data)); + this.cache.remove(packet.getId()); + super.upStream(result); + } + } + } else { + super.upStream(packet); + } + } + + /** + * @return the maxPacketSize + */ + public int getMaxPacketSize() { + return this.maxPacketSize; + } + + /** + * @param maxPacketSize the maxPacketSize to set + */ + public void setMaxPacketSize(int maxPacketSize) { + this.maxPacketSize = maxPacketSize; + } + + /** + * @return the maxCacheSize + */ + public int getMaxCacheSize() { + return this.maxCacheSize; + } + + /** + * @param maxCacheSize the maxCacheSize to set + */ + public void setMaxCacheSize(int maxCacheSize) { + this.maxCacheSize = maxCacheSize; + } + + protected boolean removeEldestCacheEntry(String id) { + boolean result = false; + if (this.cache.size()> getMaxCacheSize()) { + result = true; + LOG.warn("Cache too big - Discarding fragmented packets for " + id); + } + return result; + } +} Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/FragmentationProcessor.java ------------------------------------------------------------------------------ svn:eol-style = native Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/Packet.java URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/Packet.java?rev=719706&view=auto ============================================================================== --- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/Packet.java (added) +++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/Packet.java Fri Nov 21 12:44:40 2008 @@ -0,0 +1,151 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activeblaze.impl.processor; + +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import org.apache.activeblaze.wire.PacketData; + +/** + * Wrapper for PacketData + * + */ +public final class Packet { + final private SocketAddress from; + private InetSocketAddress to; + private String id; + final private PacketData packetData; + + /** + * Internal Constructor + * + * @param id + * @param data + */ + private Packet(String id, PacketData data) { + this.id = id; + this.packetData = data; + this.from = null; + this.to = null; + } + + /** + * Construct a Packet from PacketData + * + * @param data + */ + public Packet(PacketData data) { + this.packetData = data; + if (data.hasMessageId()) { + this.id = this.packetData.getMessageId().toString(); + } + this.from = null; + this.to = null; + } + + /** + * Construct a Packet received + * + * @param from + * @param data + */ + public Packet(SocketAddress from, PacketData data) { + this.from = from; + this.packetData = data; + if (data.hasMessageId()) { + this.id = this.packetData.getMessageId().toString(); + } + this.to = null; + } + + /** + * Construct a Packet to send + * + * @param toAddress + * @param toPort + * @param data + */ + public Packet(InetAddress toAddress, int toPort, PacketData data) { + this.to = new InetSocketAddress(toAddress, toPort); + this.packetData = data; + this.id = this.packetData.getMessageId().toString(); + this.from = null; + } + + /** + * Construct a Packet to send + * + * @param toAddress + * @param toPort + * @param data + */ + public Packet(String toAddress, int toPort, PacketData data) { + this.to = new InetSocketAddress(toAddress, toPort); + this.packetData = data; + this.id = this.packetData.getMessageId().toString(); + this.from = null; + } + + public String toString() { + StringBuilder builder = new StringBuilder("Packet:"); + builder.append(getId()); + builder.append("["); + builder.append(getPacketData().toString()); + builder.append("]"); + return builder.toString(); + } + + /** + * @return the id + */ + public String getId() { + return this.id; + } + + /** + * @return the packetData + */ + public PacketData getPacketData() { + return this.packetData; + } + + public Packet clone() { + return new Packet(this.id, this.packetData.clone()); + } + + /** + * @return the from + */ + public SocketAddress getFrom() { + return this.from; + } + + /** + * @return the to + */ + public InetSocketAddress getTo() { + return this.to; + } + + /** + * @param to the to to set + */ + public void setTo(InetSocketAddress to) { + this.to = to; + } +} Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/Packet.java ------------------------------------------------------------------------------ svn:eol-style = native Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/PacketAudit.java URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/PacketAudit.java?rev=719706&view=auto ============================================================================== --- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/PacketAudit.java (added) +++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/PacketAudit.java Fri Nov 21 12:44:40 2008 @@ -0,0 +1,133 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activeblaze.impl.processor; + +import java.util.LinkedHashMap; +import java.util.Map; +import org.apache.activeblaze.BaseService; +import org.apache.activeblaze.util.BitArrayBin; +import org.apache.activeblaze.wire.PacketData; +import org.apache.activemq.protobuf.Buffer; + + +/** + * Checks for duplicates + * + */ +public class PacketAudit extends BaseService { + private LinkedHashMap cache; + private int maxChannels = 256; + private int maxAuditDepth = 1024; + + public boolean shutDown() throws Exception { + boolean result = super.shutDown(); + if (result) { + this.cache = null; + } + return result; + } + + public boolean start() throws Exception { + boolean result = super.start(); + if (result) { + if (this.cache == null) { + this.cache = new LinkedHashMap() { + protected boolean removeEldestEntry( + Map.Entry eldest) { + return size() > getMaxChannels(); + } + }; + } + } + return result; + } + + /** + * @return the maxChannels + */ + public int getMaxChannels() { + return this.maxChannels; + } + + /** + * @param maxChannels + * the maxChannels to set + */ + public void setMaxChannels(int maxChannels) { + this.maxChannels = maxChannels; + } + + /** + * tests for a duplicate message + * + * @param packet + * @return + */ + public boolean isDuplicate(Packet packet) { + PacketData data = packet.getPacketData(); + return isDuplicate(data.getProducerId(), data.getMessageSequence()); + } + + /** + * tests for a duplicate message + * + * @param producerId + * @param id + * @return + */ + public boolean isDuplicate(String producerId, long id) { + return isDuplicate(new Buffer(producerId), id); + } + + /** + * tests for a duplicate message + * + * @param key + * @param id + * @return + */ + public boolean isDuplicate(Buffer key, long id) { + boolean result = false; + LinkedHashMap theCache = this.cache; + if (theCache != null) { + synchronized (theCache) { + BitArrayBin bin = theCache.get(key); + if (bin == null) { + bin = new BitArrayBin(getMaxAuditDepth()); + theCache.put(key, bin); + } + result = bin.setBit(id, true); + } + } + return result; + } + + /** + * @return the maxAuditDepth + */ + public int getMaxAuditDepth() { + return this.maxAuditDepth; + } + + /** + * @param maxAuditDepth + * the maxAuditDepth to set + */ + public void setMaxAuditDepth(int maxAuditDepth) { + this.maxAuditDepth = maxAuditDepth; + } +} Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/PacketAudit.java ------------------------------------------------------------------------------ svn:eol-style = native Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/PacketMessageType.java URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/PacketMessageType.java?rev=719706&view=auto ============================================================================== --- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/PacketMessageType.java (added) +++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/PacketMessageType.java Fri Nov 21 12:44:40 2008 @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activeblaze.impl.processor; + +import org.apache.activeblaze.wire.MessageType; + +/** + * @author rajdavies + * + */ +public interface PacketMessageType { + /** + * @return the type of Packet + */ + MessageType type(); +} Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/PacketMessageType.java ------------------------------------------------------------------------------ svn:eol-style = native Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/package.html URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/package.html?rev=719706&view=auto ============================================================================== --- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/package.html (added) +++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/package.html Fri Nov 21 12:44:40 2008 @@ -0,0 +1,25 @@ +!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> + + + + + +Processors for packets sent on the network + + + \ No newline at end of file Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/ReliableFactory.java URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/ReliableFactory.java?rev=719706&view=auto ============================================================================== --- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/ReliableFactory.java (added) +++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/ReliableFactory.java Fri Nov 21 12:44:40 2008 @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activeblaze.impl.reliable; + +import java.util.Map; +import org.apache.activeblaze.impl.processor.ChainedProcessor; +import org.apache.activeblaze.util.ObjectFinder; +import org.apache.activeblaze.util.PropertyUtil; + +/** + * Find a reliable implementation + * + */ +public class ReliableFactory { + + private static final ObjectFinder OBJECT_FINDER = new ObjectFinder("META-INF/services/org/apache/activeblaze/reliable/"); + + /** + * @param location + * @return the configured transport from its URI + * @throws Exception + */ + public static ChainedProcessor get(String location) throws Exception { + ChainedProcessor result = findReliable(location); + configure(result, location); + return result; + } + + static void configure(ChainedProcessor transport, String location) throws Exception { + Map options = PropertyUtil.parseParameters(location); + PropertyUtil.setProperties(transport, options); + } + + private static ChainedProcessor findReliable(String location) throws Exception { + String scheme = PropertyUtil.stripBefore(location, '?'); + if (scheme == null) { + throw new IllegalArgumentException("Reliability scheme not specified: [" + location + "]"); + } + ChainedProcessor result = (ChainedProcessor) OBJECT_FINDER.newInstance(scheme); + return result; + } +} Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/ReliableFactory.java ------------------------------------------------------------------------------ svn:eol-style = native Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/flow/SimpleFlow.java URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/flow/SimpleFlow.java?rev=719706&view=auto ============================================================================== --- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/flow/SimpleFlow.java (added) +++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/flow/SimpleFlow.java Fri Nov 21 12:44:40 2008 @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activeblaze.impl.reliable.flow; + +import org.apache.activeblaze.impl.processor.ChainedProcessor; +import org.apache.activeblaze.impl.processor.Packet; + +/** + * Simple FlowControl + * + */ +public class SimpleFlow extends ChainedProcessor { + int maxWindowSize = 4 * 1024; + int windowSize = 0; + int pauseTime = 2; + + public void downStream(Packet p) throws Exception { + this.windowSize += p.getPacketData().serializedSizeFramed(); + if (this.windowSize >= this.maxWindowSize) { + Thread.sleep(this.pauseTime); + this.windowSize = 0; + } + super.downStream(p); + } + + /** + * @return the maxWindowSize + */ + public int getMaxWindowSize() { + return this.maxWindowSize; + } + + /** + * @param maxWindowSize + * the maxWindowSize to set + */ + public void setMaxWindowSize(int maxWindowSize) { + this.maxWindowSize = maxWindowSize; + } + + /** + * @return the pauseTime + */ + public int getPauseTime() { + return this.pauseTime; + } + + /** + * @param pauseTime + * the pauseTime to set + */ + public void setPauseTime(int pauseTime) { + this.pauseTime = pauseTime; + } +} Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/flow/SimpleFlow.java ------------------------------------------------------------------------------ svn:eol-style = native Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/simple/SimpleReliableProcessor.java URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/simple/SimpleReliableProcessor.java?rev=719706&view=auto ============================================================================== --- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/simple/SimpleReliableProcessor.java (added) +++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/simple/SimpleReliableProcessor.java Fri Nov 21 12:44:40 2008 @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activeblaze.impl.reliable.simple; + +import org.apache.activeblaze.impl.processor.ChainedProcessor; +import org.apache.activeblaze.impl.reliable.flow.SimpleFlow; + +/** + * Very basic (none) reliability + * + */ +public class SimpleReliableProcessor extends ChainedProcessor{ + + private SimpleFlow simpleFlow; + + /** + * Constructor + */ + public SimpleReliableProcessor() { + this.simpleFlow=new SimpleFlow(); + setEnd(this.simpleFlow); + } + +} Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/simple/SimpleReliableProcessor.java ------------------------------------------------------------------------------ svn:eol-style = native Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/BaseTransport.java URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/BaseTransport.java?rev=719706&view=auto ============================================================================== --- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/BaseTransport.java (added) +++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/BaseTransport.java Fri Nov 21 12:44:40 2008 @@ -0,0 +1,197 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activeblaze.impl.transport; + +import java.net.URI; +import org.apache.activeblaze.BlazeConfiguration; +import org.apache.activeblaze.impl.processor.PacketAudit; + +/** + * Base Class for transports + * + */ +public abstract class BaseTransport extends ThreadChainedProcessor{ + static final int DEFAULT_MAX_PACKET_SIZE = BlazeConfiguration.DEFAULT_MAX_PACKET_SIZE; + static final int DEFAULT_BUFFER_SIZE = 64 * 1024; + private URI localURI; + private int maxPacketSize = DEFAULT_MAX_PACKET_SIZE; + private int bufferSize = DEFAULT_BUFFER_SIZE; + private int soTimeout = 2000; + private int timeToLive = 1; + private boolean loopBack = false; + protected final PacketAudit audit = new PacketAudit(); + private boolean broadcast = false; + private boolean enableAudit = false; + + + public boolean init() throws Exception { + boolean result = super.init(); + if(result) { + this.audit.init(); + } + return result; + } + + + public boolean shutDown() throws Exception { + boolean result = super.shutDown(); + if(result) { + this.audit.shutDown(); + } + return result; + } + + + public boolean start() throws Exception { + boolean result = super.start(); + if(result) { + this.audit.start(); + } + return result; + } + + + public boolean stop() throws Exception { + boolean result = super.stop(); + if(result) { + this.audit.stop(); + } + return result; + } + + /** + * @return the localURI + */ + public URI getLocalURI(){ + return this.localURI; + } + + /** + * @param localURI the localURI to set + */ + public void setLocalURI(URI localURI){ + this.localURI = localURI; + } + + /** + * @return the maxPacketSize + */ + public int getMaxPacketSize(){ + return this.maxPacketSize; + } + + /** + * @param maxPacketSize the maxPacketSize to set + */ + public void setMaxPacketSize(int maxPacketSize){ + this.maxPacketSize = maxPacketSize; + } + + /** + * @return the bufferSize + */ + public int getBufferSize(){ + return this.bufferSize; + } + + /** + * @param bufferSize the bufferSize to set + */ + public void setBufferSize(int bufferSize){ + this.bufferSize = bufferSize; + } + + /** + * @return the soTimeout + */ + public int getSoTimeout(){ + return this.soTimeout; + } + + /** + * @param soTimeout the soTimeout to set + */ + public void setSoTimeout(int soTimeout){ + this.soTimeout = soTimeout; + } + + /** + * @return the timeToLive + */ + public int getTimeToLive(){ + return this.timeToLive; + } + + /** + * @param timeToLive the timeToLive to set + */ + public void setTimeToLive(int timeToLive){ + this.timeToLive = timeToLive; + } + + /** + * @return the loopBack + */ + public boolean isLoopBack(){ + return this.loopBack; + } + + /** + * @param loopBack the loopBack to set + */ + public void setLoopBack(boolean loopBack){ + this.loopBack = loopBack; + } + + /** + * @return the audit + */ + protected PacketAudit getAudit(){ + return this.audit; + } + + /** + * @return the broadcast + */ + public boolean isBroadcast() { + return this.broadcast; + } + + /** + * @param broadcast + * the broadcast to set + */ + public void setBroadcast(boolean broadcast) { + this.broadcast = broadcast; + } + + + /** + * @return the enableAudit + */ + public boolean isEnableAudit() { + return this.enableAudit; + } + + + /** + * @param enableAudit the enableAudit to set + */ + public void setEnableAudit(boolean enableAudit) { + this.enableAudit = enableAudit; + } +} Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/BaseTransport.java ------------------------------------------------------------------------------ svn:eol-style = native