Return-Path: Delivered-To: apmail-incubator-geronimo-cvs-archive@www.apache.org Received: (qmail 87301 invoked from network); 3 Mar 2004 15:27:36 -0000 Received: from daedalus.apache.org (HELO mail.apache.org) (208.185.179.12) by minotaur-2.apache.org with SMTP; 3 Mar 2004 15:27:36 -0000 Received: (qmail 90386 invoked by uid 500); 3 Mar 2004 15:27:29 -0000 Delivered-To: apmail-incubator-geronimo-cvs-archive@incubator.apache.org Received: (qmail 90354 invoked by uid 500); 3 Mar 2004 15:27:29 -0000 Mailing-List: contact geronimo-cvs-help@incubator.apache.org; run by ezmlm Precedence: bulk list-help: list-unsubscribe: list-post: Reply-To: geronimo-dev@incubator.apache.org Delivered-To: mailing list geronimo-cvs@incubator.apache.org Received: (qmail 90327 invoked from network); 3 Mar 2004 15:27:28 -0000 Received: from unknown (HELO minotaur.apache.org) (209.237.227.194) by daedalus.apache.org with SMTP; 3 Mar 2004 15:27:28 -0000 Received: (qmail 87259 invoked by uid 1782); 3 Mar 2004 15:27:33 -0000 Date: 3 Mar 2004 15:27:33 -0000 Message-ID: <20040303152733.87258.qmail@minotaur.apache.org> From: gdamour@apache.org To: incubator-geronimo-cvs@apache.org Subject: cvs commit: incubator-geronimo/sandbox/webdav/src/java/org/apache/geronimo/datastore/impl/remote/messaging MsgQueue.java ServerProcessors.java X-Spam-Rating: daedalus.apache.org 1.6.2 0/1000/N X-Spam-Rating: minotaur-2.apache.org 1.6.2 0/1000/N gdamour 2004/03/03 07:27:33 Modified: sandbox/webdav/src/java/org/apache/geronimo/datastore/impl/remote/messaging MsgQueue.java ServerProcessors.java Added: sandbox/webdav/src/test/org/apache/geronimo/datastore/impl/remote/replication ReplicationTest.java sandbox/webdav/src/java/org/apache/geronimo/datastore/impl/remote/replication UpdateEvent.java UpdateListener.java SimpleReplicatedMap.java ReplicationException.java ReplicationCapable.java ReplicationMember.java Log: A simple replication framework based on the messaging networking infrastructure. A basic Map has been implemented in order to depict how this framework could be leverage and a use-case highlights how the classes fit together. Revision Changes Path 1.1 incubator-geronimo/sandbox/webdav/src/test/org/apache/geronimo/datastore/impl/remote/replication/ReplicationTest.java Index: ReplicationTest.java =================================================================== /** * * Copyright 2004 The Apache Software Foundation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.geronimo.datastore.impl.remote.replication; import java.net.InetAddress; import java.util.Collections; import java.util.HashMap; import java.util.Map; import junit.framework.TestCase; import org.apache.geronimo.datastore.impl.remote.messaging.NodeInfo; import org.apache.geronimo.datastore.impl.remote.messaging.ServerNode; /** * * @version $Revision: 1.1 $ $Date: 2004/03/03 15:27:32 $ */ public class ReplicationTest extends TestCase { SimpleReplicatedMap replicant1; ReplicationMember replication1; ReplicationMember replication2; protected void setUp() throws Exception { replicant1 = new SimpleReplicatedMap(); replication1 = new ReplicationMember("Replication1", new String[] {"Node2"}); InetAddress address = InetAddress.getLocalHost(); NodeInfo nodeInfo1 = new NodeInfo("Node1", address, 8080); ServerNode server1 = new ServerNode(nodeInfo1, Collections.singleton(replication1)); server1.doStart(); replication1.doStart(); replication2 = new ReplicationMember("Replication1", new String[] {"Node1"}); NodeInfo nodeInfo2 = new NodeInfo("Node2", address, 8082); ServerNode server2 = new ServerNode(nodeInfo2, Collections.singleton(replication2)); server2.doStart(); replication2.doStart(); server2.join(nodeInfo1); } public void testUseCase() { replicant1.put("test1", "value1"); replication1.registerReplicantCapable(replicant1); Object id = replicant1.getID(); SimpleReplicatedMap replicant2 = (SimpleReplicatedMap) replication2.retrieveReplicantCapable(id); assertNotNull("Not been registered", replicant2); assertEquals("value1", replicant2.get("test1")); replicant1.put("test2", "value2"); assertEquals("value2", replicant2.get("test2")); replicant1.remove("test1"); assertNull(replicant2.get("test1")); Map tmp = new HashMap(); tmp.put("test3", "value3"); replicant1.putAll(tmp); assertEquals("value3", replicant2.get("test3")); replicant2.remove("test3"); assertNull(replicant1.get("test3")); } } 1.1 incubator-geronimo/sandbox/webdav/src/java/org/apache/geronimo/datastore/impl/remote/replication/UpdateEvent.java Index: UpdateEvent.java =================================================================== /** * * Copyright 2004 The Apache Software Foundation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.geronimo.datastore.impl.remote.replication; import java.io.Serializable; /** * Event to be multicasted by a ReplicationCapable upon modification. * * @version $Revision: 1.1 $ $Date: 2004/03/03 15:27:32 $ */ public interface UpdateEvent extends Serializable { /** * Gets the event identifier. * * @return Event identifier. */ public int getId(); /** * Gets the target of the event. It must be the ReplicationCapable which * has been updated. * * @return Instance which has been updated. */ public Object getTarget(); /** * Sets the target of this event. * * @param aTarget Instance which has been updated. */ public void setTarget(Object aTarget); } 1.1 incubator-geronimo/sandbox/webdav/src/java/org/apache/geronimo/datastore/impl/remote/replication/UpdateListener.java Index: UpdateListener.java =================================================================== /** * * Copyright 2004 The Apache Software Foundation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.geronimo.datastore.impl.remote.replication; /** * UpdateEvent listener. * * @version $Revision: 1.1 $ $Date: 2004/03/03 15:27:32 $ */ public interface UpdateListener { /** * Fire an UpdateEvent on this listener. * * @param anEvent UpdateEvent to be fired. */ public void fireUpdateEvent(UpdateEvent anEvent); } 1.1 incubator-geronimo/sandbox/webdav/src/java/org/apache/geronimo/datastore/impl/remote/replication/SimpleReplicatedMap.java Index: SimpleReplicatedMap.java =================================================================== /** * * Copyright 2004 The Apache Software Foundation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.geronimo.datastore.impl.remote.replication; import java.util.Collection; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.Map; import java.util.Set; /** * A simple Map, which is ReplicationCapable aware. * * @version $Revision: 1.1 $ $Date: 2004/03/03 15:27:32 $ */ public class SimpleReplicatedMap implements Map, ReplicationCapable { private Map delegate; private Set listeners; private Object objectID; public SimpleReplicatedMap() { delegate = new HashMap(); listeners = new HashSet(); } public void setID(Object anID) { objectID = anID; } public Object getID() { return objectID; } public void addUpdateListener(UpdateListener aListener) { synchronized(listeners) { listeners.add(aListener); } } public void removeUpdateListener(UpdateListener aListener) { synchronized(listeners) { listeners.remove(aListener); } } private void multicastEvent(MapUpdateEvent anEvent) { synchronized(listeners) { for (Iterator iter = listeners.iterator(); iter.hasNext();) { UpdateListener listener = (UpdateListener) iter.next(); listener.fireUpdateEvent(anEvent); } } } public void mergeWithUpdate(UpdateEvent anEvent) throws ReplicationException { MapUpdateEvent event = (MapUpdateEvent) anEvent; int id = anEvent.getId(); switch (id) { case MapUpdateEvent.CLEAR: delegate.clear(); break; case MapUpdateEvent.PUT: delegate.put(event.key, event.value); break; case MapUpdateEvent.PUTALL: delegate.putAll(event.map); break; case MapUpdateEvent.REMOVE: delegate.remove(event.key); break; default: throw new ReplicationException("Undefined event id."); } } public void clear() { multicastEvent(new MapUpdateEvent(MapUpdateEvent.CLEAR, this)); delegate.clear(); } public boolean containsKey(Object key) { return delegate.containsKey(key); } public boolean containsValue(Object value) { return delegate.containsValue(value); } public Set entrySet() { return delegate.entrySet(); } public boolean equals(Object obj) { return delegate.equals(obj); } public Object get(Object key) { return delegate.get(key); } public int hashCode() { return delegate.hashCode(); } public boolean isEmpty() { return delegate.isEmpty(); } public Set keySet() { return delegate.keySet(); } public Object put(Object key, Object value) { multicastEvent( new MapUpdateEvent(MapUpdateEvent.PUT, this, key, value)); return delegate.put(key, value); } public void putAll(Map t) { multicastEvent(new MapUpdateEvent(t, this)); delegate.putAll(t); } public Object remove(Object key) { multicastEvent( new MapUpdateEvent(MapUpdateEvent.REMOVE, this, key, null)); return delegate.remove(key); } public int size() { return delegate.size(); } public Collection values() { return delegate.values(); } public static class MapUpdateEvent implements UpdateEvent { private static final int BASE = 1; public static final int CLEAR = 1 + BASE; public static final int PUT = 2 + BASE; public static final int PUTALL = 3 + BASE; public static final int REMOVE = 4 + BASE; private final int id; private Object target; private final Object key; private final Object value; private Map map; public MapUpdateEvent(int anId, Object aTarget) { this(anId, aTarget, null, null); } public MapUpdateEvent(int anId, Object aTarget, Object aKey, Object aValue) { id = anId; target = aTarget; key = aKey; value = aValue; } public MapUpdateEvent(Map aMap, Object aTarget) { this(PUTALL, aTarget, null, null); map = aMap; } public int getId() { return id; } public Object getTarget() { return target; } public void setTarget(Object aTarget) { target = aTarget; } } } 1.1 incubator-geronimo/sandbox/webdav/src/java/org/apache/geronimo/datastore/impl/remote/replication/ReplicationException.java Index: ReplicationException.java =================================================================== /** * * Copyright 2004 The Apache Software Foundation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.geronimo.datastore.impl.remote.replication; /** * Exception raised when a replication is not possible. * * @version $Revision: 1.1 $ $Date: 2004/03/03 15:27:32 $ */ public class ReplicationException extends Exception { public ReplicationException(String aMessage) { super(aMessage); } public ReplicationException(String aMessage, Throwable aNested) { super(aMessage, aNested); } } 1.1 incubator-geronimo/sandbox/webdav/src/java/org/apache/geronimo/datastore/impl/remote/replication/ReplicationCapable.java Index: ReplicationCapable.java =================================================================== /** * * Copyright 2004 The Apache Software Foundation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.geronimo.datastore.impl.remote.replication; import java.io.Serializable; /** * * TODO introduce versioning. * * @version $Revision: 1.1 $ $Date: 2004/03/03 15:27:32 $ */ public interface ReplicationCapable extends Serializable { /** * Identifier of this ReplicationCapable. It identifies uniquely the * instance in the scope of a replication group. *
* Identfiers are automatically created by ReplicationMembers. * * @return Identifier of this instance. */ public Object getID(); /** * Sets the identifier of this instance in the scope of the replication * group managing it. * * @param anID Identifier. */ public void setID(Object anID); /** * Adds an UpdateEvent listener. * * @param aListener Listener to be notified when an update is performed * on this instance. */ public void addUpdateListener(UpdateListener aListener); /** * Removes the specified UpdateListener. * * @param aListener Listener to be removed. */ public void removeUpdateListener(UpdateListener aListener); /** * Merges an UpdateEvent with the state of this instance. * * @param anEvent UpdateEvent to be merged with this instance. * @throws ReplicationException Indicates that the merge can not be * performed. */ public void mergeWithUpdate(UpdateEvent anEvent) throws ReplicationException; } 1.1 incubator-geronimo/sandbox/webdav/src/java/org/apache/geronimo/datastore/impl/remote/replication/ReplicationMember.java Index: ReplicationMember.java =================================================================== /** * * Copyright 2004 The Apache Software Foundation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.geronimo.datastore.impl.remote.replication; import java.io.Serializable; import java.util.HashMap; import java.util.Map; import org.apache.geronimo.datastore.impl.remote.messaging.CommandRequest; import org.apache.geronimo.datastore.impl.remote.messaging.CommandResult; import org.apache.geronimo.datastore.impl.remote.messaging.Connector; import org.apache.geronimo.datastore.impl.remote.messaging.HeaderOutInterceptor; import org.apache.geronimo.datastore.impl.remote.messaging.Msg; import org.apache.geronimo.datastore.impl.remote.messaging.MsgBody; import org.apache.geronimo.datastore.impl.remote.messaging.MsgHeader; import org.apache.geronimo.datastore.impl.remote.messaging.MsgHeaderConstants; import org.apache.geronimo.datastore.impl.remote.messaging.MsgOutInterceptor; import org.apache.geronimo.datastore.impl.remote.messaging.RequestSender; import org.apache.geronimo.gbean.GBean; import org.apache.geronimo.gbean.GBeanContext; import org.apache.geronimo.gbean.WaitingException; /** * A replication group member. *
* This is a Connector in charge of replicating the state of registered * ReplicantCapables across N-nodes, which constitute a replication group. *
* Replication members are organized as follow: *
   * ReplicationMember -- MTO -- ServerNode -- MTM -- ServerNode -- OTM -- ReplicationMember
   * 
* * @version $Revision: 1.1 $ $Date: 2004/03/03 15:27:32 $ */ public class ReplicationMember implements UpdateListener, Connector, GBean { /** * Name of the replication group. */ private final String name; /** * ReplicantID to ReplicantCapable Map. */ private final Map idToReplicant; /** * Names of the nodes hosting the other members of the replication group * of this member. */ private String[] targetNodes; /** * Output to be used to send requests. */ private MsgOutInterceptor requestOut; /** * Output to be used to send results. */ private MsgOutInterceptor resultOut; /** * Requests sender. */ private final RequestSender sender; /** * Creates a replication group member. * * @param aName Name of the replication group owning this member. * @param aTargetNodes Names of the nodes hosting the other members of the * replication group containing this member. */ public ReplicationMember(String aName, String[] aTargetNodes) { if ( null == aName ) { throw new IllegalArgumentException("Name is required"); } else if ( null == aTargetNodes ) { throw new IllegalArgumentException("Node names is required"); } name = aName; targetNodes = aTargetNodes; idToReplicant = new HashMap(); sender = new RequestSender(); } public String getName() { return name; } public void fireUpdateEvent(UpdateEvent anEvent) { // One does not send the actual ReplicantCapable in the case of an // update. Instead, one sends only its identifier. ReplicationCapable target = (ReplicationCapable) anEvent.getTarget(); anEvent.setTarget(target.getID()); sender.sendSyncRequest( new CommandRequest("mergeWithUpdate", new Object[] {anEvent}), requestOut); } /** * Merges an UpdateEvent with a registered ReplicationCapable. * * @param anEvent Update event to be merged. * @throws ReplicationException Indicates that the merge can not be * performed. */ public void mergeWithUpdate(UpdateEvent anEvent) throws ReplicationException { ReplicantID id = (ReplicantID) anEvent.getTarget(); ReplicationCapable replicationCapable; synchronized(idToReplicant) { replicationCapable = (ReplicationCapable) idToReplicant.get(id); } if ( null == replicationCapable ) { throw new ReplicationException( "No ReplicantCapable with the id {" + id + "}"); } replicationCapable.mergeWithUpdate(anEvent); } /** * Registers a ReplicantCapable. From now, UpdateEvents multicasted * by the provided ReplicantCapable are also pushed to the replication * group. * * @param aReplicant ReplicantCapable to be controlled by this group. */ public void registerReplicantCapable(ReplicationCapable aReplicant) { ReplicantID id = new ReplicantID(); aReplicant.setID(id); sender.sendSyncRequest( new CommandRequest("registerLocalReplicantCapable", new Object[] {aReplicant}), requestOut); synchronized(idToReplicant) { idToReplicant.put(id, aReplicant); aReplicant.addUpdateListener(this); } } /** * This method is for internal use only. *
* It registers with this member a ReplicationCapable, which has been * registered by a remote member. * * @param aReplicant ReplicantCapable to be locally registered. */ public void registerLocalReplicantCapable(ReplicationCapable aReplicant) { synchronized(idToReplicant) { aReplicant.addUpdateListener(this); idToReplicant.put(aReplicant.getID(), aReplicant); } } /** * Retrieves the ReplicationCapable having the specified id. * * @param anID Replicant identifier. * @return ReplicantCapable having the specified id or null if such an * identifier is not known. */ public ReplicationCapable retrieveReplicantCapable(Object anID) { synchronized(idToReplicant) { return (ReplicationCapable) idToReplicant.get(anID); } } public void setOutput(MsgOutInterceptor anOut) { if ( null != anOut ) { MsgOutInterceptor out = new HeaderOutInterceptor( MsgHeaderConstants.DEST_CONNECTOR, name, new HeaderOutInterceptor( MsgHeaderConstants.DEST_NODE, targetNodes, anOut)); requestOut = new HeaderOutInterceptor( MsgHeaderConstants.BODY_TYPE, MsgBody.Type.REQUEST, out); resultOut = new HeaderOutInterceptor( MsgHeaderConstants.BODY_TYPE, MsgBody.Type.RESPONSE, out); } else { requestOut = null; resultOut = null; } } public void deliver(Msg aMsg) { MsgHeader header = aMsg.getHeader(); MsgBody.Type bodyType = (MsgBody.Type) header.getHeader(MsgHeaderConstants.BODY_TYPE); if ( bodyType.equals(MsgBody.Type.REQUEST) ) { handleRequest(aMsg); } else if ( bodyType.equals(MsgBody.Type.RESPONSE) ) { handleResponse(aMsg); } } /** * Handles a request Msg. * * @param aMsg Request Msg to be handled. */ protected void handleRequest(Msg aMsg) { MsgBody body = aMsg.getBody(); MsgHeader header = aMsg.getHeader(); Object sourceNode = header.getHeader(MsgHeaderConstants.SRC_NODE); Object id = header.getHeader(MsgHeaderConstants.CORRELATION_ID); CommandRequest command; String gateway; command = (CommandRequest) body.getContent(); command.setTarget(this); CommandResult result = command.execute(); Msg msg = new Msg(); body = msg.getBody(); body.setContent(result); MsgOutInterceptor reqOut = new HeaderOutInterceptor( MsgHeaderConstants.CORRELATION_ID, id, new HeaderOutInterceptor( MsgHeaderConstants.DEST_NODE, targetNodes, new HeaderOutInterceptor( MsgHeaderConstants.DEST_CONNECTOR, name, resultOut))); reqOut.push(msg); } /** * Handles a response Msg. * * @param aMsg Response to be handled. */ protected void handleResponse(Msg aMsg) { MsgBody body = aMsg.getBody(); MsgHeader header = aMsg.getHeader(); CommandResult result; result = (CommandResult) body.getContent(); sender.setResponse( (Integer) header.getHeader(MsgHeaderConstants.CORRELATION_ID), result); } public void setGBeanContext(GBeanContext context) { } public void doStart() throws WaitingException, Exception { } public void doStop() throws WaitingException, Exception { } public void doFail() { } /** * ReplicantCapable identifier. */ private static class ReplicantID implements Serializable { private static volatile int seqId = 0; private final int id; private ReplicantID() { id = seqId++; } public int hashCode() { // TODO improve me. return id; } public boolean equals(Object obj) { if ( false == obj instanceof ReplicantID ) { return false; } ReplicantID replicantID = (ReplicantID) obj; return id == replicantID.id; } } } 1.2 +5 -1 incubator-geronimo/sandbox/webdav/src/java/org/apache/geronimo/datastore/impl/remote/messaging/MsgQueue.java Index: MsgQueue.java =================================================================== RCS file: /home/cvs/incubator-geronimo/sandbox/webdav/src/java/org/apache/geronimo/datastore/impl/remote/messaging/MsgQueue.java,v retrieving revision 1.1 retrieving revision 1.2 diff -u -r1.1 -r1.2 --- MsgQueue.java 25 Feb 2004 13:36:15 -0000 1.1 +++ MsgQueue.java 3 Mar 2004 15:27:33 -0000 1.2 @@ -93,4 +93,8 @@ return message; } + public String toString() { + return "MsgQueue {" + name + "}"; + } + } 1.4 +13 -7 incubator-geronimo/sandbox/webdav/src/java/org/apache/geronimo/datastore/impl/remote/messaging/ServerProcessors.java Index: ServerProcessors.java =================================================================== RCS file: /home/cvs/incubator-geronimo/sandbox/webdav/src/java/org/apache/geronimo/datastore/impl/remote/messaging/ServerProcessors.java,v retrieving revision 1.3 retrieving revision 1.4 diff -u -r1.3 -r1.4 --- ServerProcessors.java 3 Mar 2004 13:10:07 -0000 1.3 +++ ServerProcessors.java 3 Mar 2004 15:27:33 -0000 1.4 @@ -101,13 +101,19 @@ Msg msg = in.pop(); Object destNode = in.getHeader(); MsgOutInterceptor out; - try { - out = server.getOutForNode((String) destNode); - } catch (CommunicationException e) { - log.error(e); - continue; + if ( destNode instanceof String ) { + destNode = new String[] {(String) destNode}; + } + String[] dests = (String[]) destNode; + for (int i = 0; i < dests.length; i++) { + try { + out = server.getOutForNode(dests[i]); + } catch (CommunicationException e) { + log.error(e); + continue; + } + out.push(msg); } - out.push(msg); } }