bookkeeper-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From si...@apache.org
Subject [45/49] bookkeeper git commit: BOOKKEEPER-769: Remove the Hedwig Code
Date Wed, 16 Mar 2016 03:44:55 GMT
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/main/java/org/apache/hedwig/jms/StateManager.java
----------------------------------------------------------------------
diff --git a/hedwig-client-jms/src/main/java/org/apache/hedwig/jms/StateManager.java b/hedwig-client-jms/src/main/java/org/apache/hedwig/jms/StateManager.java
deleted file mode 100644
index 26db514..0000000
--- a/hedwig-client-jms/src/main/java/org/apache/hedwig/jms/StateManager.java
+++ /dev/null
@@ -1,179 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hedwig.jms;
-
-import org.slf4j.Logger;
-
-import javax.jms.JMSException;
-
-
-/**
-    Specific to jms package - NOT to be used elsewhere.
-
-    The util class allows for managing the current readiness 'state' of the object which hosts it
-    along the axis of StateManager.State while being MT safe. Right now, both Connection and Session make use of it.
-
-    The lockObject is used to do timed wait's (which the host object will notify on) in case of async state changes.
-
-
-    This is not a general purpose code, but specific to state transitions mentioned in jms spec.
-
-
-    All use of the class goes like this :
-
-
-    StateManager.State prevState;
-    acquire lock:
-        if in transition state, wait.
-        if in expected state, return.
-        if in error state, return/throw exception.
-        if in valid state transition state -
-            prevState = currentState.
-            set to corresponding transition state (STARTING, CLOSING, etc).
-        Other method specific changes.
-    release lock:
-
-    nextState = prevState (in case state change failed, revert).
-
-    try {
-        attempt state change.
-        on success nextState = next valid state for this method.
-    } finally {
-        acquire lock:
-          change state to nextState
-        release lock:
-    }
-
-    * So at any given point of time, the state will be in transition ONLY when there is an attempt
-            being made to transition.
-    * The state will always be in a final state at all other points of time.
-    * No attempt will be made to change state while a transition state is currently in progress.
- */
-final class StateManager {
-  public static final long WAIT_TIME_FOR_TRANSIENT_STATE_CHANGE =
-      Long.getLong("WAIT_TIME_FOR_TRANSIENT_STATE_CHANGE", 16000L);
-
-  static enum State {
-        STARTING(true, false, true),
-        STARTED(true, false, false),
-        STOPPING(false, false, true),
-        STOPPED(false, false, false),
-        CLOSING(false, true, true),
-        CLOSED(false, true, false);
-
-        private final boolean inStartMode;
-        private final boolean inCloseMode;
-        private final boolean inTransitionMode;
-
-        State(boolean inStartMode, boolean inCloseMode, boolean inTransitionMode) {
-            this.inStartMode = inStartMode;
-            this.inCloseMode = inCloseMode;
-            this.inTransitionMode = inTransitionMode;
-        }
-
-        public boolean isInStartMode() {
-            return inStartMode;
-        }
-
-        public boolean isInCloseMode() {
-            return inCloseMode;
-        }
-
-        public boolean isInTransitionMode() {
-            return inTransitionMode;
-        }
-    }
-
-    // DO NOT do something silly like State.STARTING == currentState || State.STARTED == currentState, etc !
-    private volatile State currentState;
-    private final Object lockObject;
-
-    StateManager(State startStart, Object lockObject){
-        this.currentState = startStart;
-        this.lockObject = lockObject;
-    }
-
-    State getCurrentState() {
-        return currentState;
-    }
-
-    boolean isStarted() {
-        return State.STARTED == currentState;
-    }
-
-    boolean isInStartMode() {
-        return currentState.isInStartMode();
-    }
-
-    boolean isStopped() {
-        return State.STOPPED == currentState;
-    }
-
-    boolean isClosed() {
-        return State.CLOSED == currentState;
-    }
-
-    // NOT locking explicitly : typically, already locked on lockObject
-    boolean isInCloseMode() {
-        return currentState.isInCloseMode();
-    }
-
-    // NOT locking explicitly : typically, already locked on lockObject
-    boolean isTransitionState() {
-        return currentState.isInTransitionMode();
-    }
-
-    void setCurrentState(State currentState) {
-        this.currentState = currentState;
-    }
-
-    // NOT locking explicitly : MUST be already locked on lockObject
-    void waitForTransientStateChange(long timeout, Logger logger) throws JMSException {
-        final long startTime = SessionImpl.currentTimeMillis();
-        final int WAIT_UNIT = 100;
-        int retryCount = (int)(timeout / WAIT_UNIT);
-
-        while (isTransitionState()) {
-            try {
-                // If we are NOT locked on lockObject, this will throw exception !
-                lockObject.wait(WAIT_UNIT);
-            } catch (InterruptedException e) {
-                // bubble it up.
-                JMSException jex = new JMSException("Thread interrupted ... " + e);
-                jex.setLinkedException(e);
-                throw jex;
-            }
-            retryCount --;
-            if (retryCount <= 0) {
-                if (logger.isDebugEnabled()) DebugUtil.dumpAllStacktraces(logger);
-                // throw new JMSException("wait timeout " + (SessionImpl.currentTimeMillis() - startTime));
-                throw new JMSException("wait for " + (SessionImpl.currentTimeMillis() - startTime) + " timeout");
-            }
-        }
-    }
-
-    @Override
-    public String toString() {
-        final StringBuilder sb = new StringBuilder();
-        sb.append("StateManager");
-        sb.append("{currentState=").append(currentState);
-        sb.append('}');
-        return sb.toString();
-    }
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/main/java/org/apache/hedwig/jms/jndi/HedwigInitialContext.java
----------------------------------------------------------------------
diff --git a/hedwig-client-jms/src/main/java/org/apache/hedwig/jms/jndi/HedwigInitialContext.java b/hedwig-client-jms/src/main/java/org/apache/hedwig/jms/jndi/HedwigInitialContext.java
deleted file mode 100644
index 2e0cddd..0000000
--- a/hedwig-client-jms/src/main/java/org/apache/hedwig/jms/jndi/HedwigInitialContext.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.jms.jndi;
-
-import org.apache.hedwig.jms.spi.HedwigConnectionFactoryImpl;
-
-import javax.jms.ConnectionFactory;
-import javax.naming.Name;
-import javax.naming.NamingException;
-import javax.naming.directory.InitialDirContext;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Hashtable;
-import java.util.Set;
-
-/**
- * Based (very very loosely) on
- * <a href="http://docs.oracle.com/javase/1.3/docs/guide/jndi/spec/spi/jndispi.fm.html">jndi guide</a>. <br/>
- * The InitialContext implementation clients should be using to get to our implementation. <br/>
- * It is possible (by configuring via administrative means for example) to use a different DirContext
- * to get to our provider implementation
- * if the various classes exposed are the same as exposed via this DirContext.<br/>
- * <p/>
- * Ideally, the env property
- * {@link javax.naming.Context.INITIAL_CONTEXT_FACTORY} "java.naming.factory.initial" is set to our factory
- * {@link HedwigInitialContextFactory} classname which will return this InitialDirContext.
- */
-public class HedwigInitialContext extends InitialDirContext {
-
-    public static final String CONNECTION_FACTORY_NAME = "jms/ConnectionFactory";
-    public static final String TOPIC_CONNECTION_FACTORY_NAME = "jms/TopicConnectionFactory";
-    // public static final String QUEUE_CONNECTION_FACTORY_NAME = "jms/QueueConnectionFactory";
-
-    // Hardcoding to point to HedwigConnectionFactoryImpl by default.
-    private static final Set<String> defaultNamesMapping;
-    static {
-        Set<String> set = new HashSet<String>(8);
-
-        // The actual name's for the various factories are bound by an admin. For convinence sake,
-        // we are providing default bindings.
-
-        // The default connection
-        set.add("jms/ConnectionFactory");
-        set.add("jms/TopicConnectionFactory");
-        // Add in future - for now, we do not support it.
-        // set.add("jms/QueueConnectionFactory");
-
-
-        set.add("ConnectionFactory");
-        set.add("TopicConnectionFactory");
-        // Add in future - for now, we do not support it.
-        // set.add("QueueConnectionFactory");
-        defaultNamesMapping = Collections.unmodifiableSet(set);
-    }
-
-    protected HedwigInitialContext(boolean lazy) throws NamingException {
-        super(lazy);
-    }
-
-    public HedwigInitialContext() throws NamingException {
-        super();
-    }
-
-    public HedwigInitialContext(Hashtable<?, ?> environment) throws NamingException {
-        super(environment);
-    }
-
-    private ConnectionFactory ourLookup(String name){
-        if (defaultNamesMapping.contains(name)){
-            return new HedwigConnectionFactoryImpl();
-        }
-
-        return null;
-    }
-
-    @Override
-    public Object lookup(String name) throws NamingException {
-        ConnectionFactory factory = ourLookup(name);
-        if (null != factory) return factory;
-
-        return super.lookup(name);
-    }
-
-    @Override
-    public Object lookup(Name name) throws NamingException {
-        ConnectionFactory factory = ourLookup(name.toString());
-        if (null != factory) return factory;
-
-        return super.lookup(name);
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/main/java/org/apache/hedwig/jms/jndi/HedwigInitialContextFactory.java
----------------------------------------------------------------------
diff --git a/hedwig-client-jms/src/main/java/org/apache/hedwig/jms/jndi/HedwigInitialContextFactory.java b/hedwig-client-jms/src/main/java/org/apache/hedwig/jms/jndi/HedwigInitialContextFactory.java
deleted file mode 100644
index b701aad..0000000
--- a/hedwig-client-jms/src/main/java/org/apache/hedwig/jms/jndi/HedwigInitialContextFactory.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.jms.jndi;
-
-import javax.naming.Context;
-import javax.naming.NamingException;
-import javax.naming.spi.InitialContextFactory;
-import java.util.Hashtable;
-
-/**
- * See HedwigInitialContext for more information.
- */
-public class HedwigInitialContextFactory implements InitialContextFactory {
-    @Override
-    public Context getInitialContext(Hashtable<?, ?> environment) throws NamingException {
-        return new HedwigInitialContext(environment);
-    }
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/main/java/org/apache/hedwig/jms/jndi/package-info.html
----------------------------------------------------------------------
diff --git a/hedwig-client-jms/src/main/java/org/apache/hedwig/jms/jndi/package-info.html b/hedwig-client-jms/src/main/java/org/apache/hedwig/jms/jndi/package-info.html
deleted file mode 100644
index bab5787..0000000
--- a/hedwig-client-jms/src/main/java/org/apache/hedwig/jms/jndi/package-info.html
+++ /dev/null
@@ -1,65 +0,0 @@
-<!--
-   Licensed to the Apache Software Foundation (ASF) under one or more
-   contributor license agreements.  See the NOTICE file distributed with
-   this work for additional information regarding copyright ownership.
-   The ASF licenses this file to You under the Apache License, Version 2.0
-   (the "License"); you may not use this file except in compliance with
-   the License.  You may obtain a copy of the License at
-
-       http://www.apache.org/licenses/LICENSE-2.0
-
-   Unless required by applicable law or agreed to in writing, software
-   distributed under the License is distributed on an "AS IS" BASIS,
-   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-   See the License for the specific language governing permissions and
-   limitations under the License.
--->
-
-
-Contains some default SHIM's to interface with JNDI - so that clients can use our JMS provider without
-ANY code level ties.<br/>
-The ideal way to use JNDI is to have administrator configure JNDI such that a well-known jndi NAME is
-bound to "org.apache.hedwig.jms.spi.HedwigConnectionFactoryImpl" for Topic and JMS connection factory. <br/>
-<p/>
-<p/>
-Alternatively, if the user is NOT within an admin controlled JNDI environment, there are two other
-common options : <br/>
-<ul>
-  <li>Set the "java.naming.factory.initial" environment property to our Context factory
-    "org.apache.hedwig.jms.jndi.HedwigInitialContextFactory"</li>
-  <li>Directly instantiate the "org.apache.hedwig.jms.jndi.HedwigInitialContext" as a JNDI InitialContext
-    and pull the relevant factories, via it.</li>
-</ul>
-
-In either of these two cases, we expose 6 well defined 'names' for users, to pull the relevant
-factories from the JNDI context.
-<table border="1">
-  <tr>
-    <th>JNDI name</th>
-    <th>Connection factory</th>
-  </tr>
-  <tr>
-    <td>jms/ConnectionFactory</td>
-    <td>ConnectionFactory</td>
-  </tr>
-  <tr>
-    <td>jms/TopicConnectionFactory</td>
-    <td>TopicConnectionFactory</td>
-  </tr>
-  <tr>
-    <td>jms/QueueConnectionFactory</td>
-    <td>QueueConnectionFactory</td>
-  </tr>
-  <tr>
-    <td>ConnectionFactory</td>
-    <td>ConnectionFactory</td>
-  </tr>
-  <tr>
-    <td>TopicConnectionFactory</td>
-    <td>TopicConnectionFactory</td>
-  </tr>
-  <tr>
-    <td>QueueConnectionFactory</td>
-    <td>QueueConnectionFactory</td>
-  </tr>
-</table>

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/main/java/org/apache/hedwig/jms/message/BytesMessageImpl.java
----------------------------------------------------------------------
diff --git a/hedwig-client-jms/src/main/java/org/apache/hedwig/jms/message/BytesMessageImpl.java b/hedwig-client-jms/src/main/java/org/apache/hedwig/jms/message/BytesMessageImpl.java
deleted file mode 100644
index fb564dc..0000000
--- a/hedwig-client-jms/src/main/java/org/apache/hedwig/jms/message/BytesMessageImpl.java
+++ /dev/null
@@ -1,657 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.jms.message;
-
-import com.google.protobuf.ByteString;
-import org.apache.hedwig.jms.SessionImpl;
-import org.apache.hedwig.protocol.PubSubProtocol;
-
-import javax.jms.BytesMessage;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageEOFException;
-import javax.jms.MessageNotReadableException;
-import javax.jms.MessageNotWriteableException;
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.util.Map;
-
-/**
- * To be used for raw bytes ...
- */
-public class BytesMessageImpl extends MessageImpl implements BytesMessage {
-    private ReadOnlyMessage readOnlyMessage;
-    private WriteOnlyMessage writeOnlyMessage;
-    private boolean readMode;
-
-    public BytesMessageImpl(SessionImpl session) throws JMSException {
-        super(session);
-        clearBody();
-    }
-
-    // To clone a message
-    public BytesMessageImpl(SessionImpl session, BytesMessageImpl message, String sourceTopicName,
-                            String subscriberId) throws JMSException {
-        super(session, (MessageImpl) message, sourceTopicName, subscriberId);
-        try {
-            if (message.readMode){
-                this.readOnlyMessage = new ReadOnlyMessage(message.readOnlyMessage.getDataCopy());
-                this.writeOnlyMessage = null;
-            }
-            else {
-                this.readOnlyMessage = null;
-                this.writeOnlyMessage = new WriteOnlyMessage(message.writeOnlyMessage.getPayloadAsBytes());
-            }
-        } catch (IOException e) {
-            JMSException ex = new JMSException("Unable to clone/copy input message " + message + " .. " + e);
-            ex.setLinkedException(e);
-            throw ex;
-        }
-        this.readMode = message.readMode;
-    }
-
-    // To clone a message from a BytesMessage which is NOT BytesMessageImpl
-    // Changing order of parameter to NOT accidentally clash with the constructor above.
-    // This is midly confusing, but helps a lot in preventing accidental bugs !
-    public BytesMessageImpl(BytesMessage message, SessionImpl session) throws JMSException {
-        super((Message) message, session);
-
-        if (message instanceof BytesMessageImpl) {
-            throw new JMSException("Coding bug - should use this constructor ONLY for non " +
-                "BytesMessageImpl messages");
-        }
-
-        // copy the bytes ...
-        final byte[] data;
-        {
-            final long length = message.getBodyLength();
-            if (length < 0 || length >= Integer.MAX_VALUE) throw new JMSException("Unreasonably " +
-                "large value for body Length : " + length);
-
-            data = new byte[(int) length];
-            int read = 0;
-            while (read < length){
-                int sz = message.readBytes(data, read);
-                read += sz;
-            }
-        }
-
-        try {
-            this.writeOnlyMessage = new WriteOnlyMessage(data);
-        } catch (IOException e) {
-            JMSException ex = new JMSException("Unable to clone/copy input message " + message + " .. " + e);
-            ex.setLinkedException(e);
-            throw ex;
-        }
-        this.readOnlyMessage  = null;
-        this.readMode = true;
-    }
-
-    public BytesMessageImpl(SessionImpl session, PubSubProtocol.Message message, Map<String, Object> properties,
-                            String sourceTopicName, String subscriberId, Runnable ackRunnable) throws JMSException {
-        super(session, message, properties, sourceTopicName, subscriberId, ackRunnable);
-
-        this.readOnlyMessage = new ReadOnlyMessage(message.getBody().toByteArray());
-        this.writeOnlyMessage = null;
-        this.readMode = true;
-    }
-
-    @Override
-    protected MessageUtil.SupportedMessageTypes  getJmsMessageType() {
-        return MessageUtil.SupportedMessageTypes.BYTES;
-    }
-
-    protected boolean isBodyEmpty(){
-        return false;
-    }
-
-    @Override
-    public PubSubProtocol.Message generateHedwigMessage() throws JMSException {
-        PubSubProtocol.Message.Builder builder = PubSubProtocol.Message.newBuilder();
-        super.populateBuilderWithHeaders(builder);
-
-        // Now set body and type.
-        try {
-            builder.setBody(ByteString.copyFrom(getPayloadData()));
-        } catch (IOException e) {
-            JMSException ex = new JMSException("Unable to read message data .. " + e);
-            ex.setLinkedException(e);
-            throw ex;
-        }
-
-        return builder.build();
-    }
-
-    @Override
-    public long getBodyLength() throws JMSException {
-        if (!readMode) throw new MessageNotReadableException("Message not readable");
-        return readOnlyMessage.getBodyLength();
-    }
-
-    @Override
-    public boolean readBoolean() throws JMSException {
-        if (!readMode) throw new MessageNotReadableException("Message not readable");
-        try {
-            return readOnlyMessage.readBoolean();
-        } catch (IOException eof){
-            MessageEOFException eofEx = new MessageEOFException("eof ?");
-            eofEx.setLinkedException(eof);
-            throw eofEx;
-        }
-    }
-
-    @Override
-    public byte readByte() throws JMSException {
-        if (!readMode) throw new MessageNotReadableException("Message not readable");
-        try {
-            return readOnlyMessage.readByte();
-        } catch (IOException eof){
-            MessageEOFException eofEx = new MessageEOFException("eof ?");
-            eofEx.setLinkedException(eof);
-            throw eofEx;
-        }
-    }
-
-    @Override
-    public int readUnsignedByte() throws JMSException {
-        if (!readMode) throw new MessageNotReadableException("Message not readable");
-        try {
-            return readOnlyMessage.readUnsignedByte();
-        } catch (IOException eof){
-            MessageEOFException eofEx = new MessageEOFException("eof ?");
-            eofEx.setLinkedException(eof);
-            throw eofEx;
-        }
-    }
-
-    @Override
-    public short readShort() throws JMSException {
-        if (!readMode) throw new MessageNotReadableException("Message not readable");
-        try {
-            return readOnlyMessage.readShort();
-        } catch (IOException eof){
-            MessageEOFException eofEx = new MessageEOFException("eof ?");
-            eofEx.setLinkedException(eof);
-            throw eofEx;
-        }
-    }
-
-    @Override
-    public int readUnsignedShort() throws JMSException {
-        if (!readMode) throw new MessageNotReadableException("Message not readable");
-        try {
-            return readOnlyMessage.readUnsignedShort();
-        } catch (IOException eof){
-            MessageEOFException eofEx = new MessageEOFException("eof ?");
-            eofEx.setLinkedException(eof);
-            throw eofEx;
-        }
-    }
-
-    @Override
-    public char readChar() throws JMSException {
-        if (!readMode) throw new MessageNotReadableException("Message not readable");
-        try {
-            return readOnlyMessage.readChar();
-        } catch (IOException eof){
-            MessageEOFException eofEx = new MessageEOFException("eof ?");
-            eofEx.setLinkedException(eof);
-            throw eofEx;
-        }
-    }
-
-    @Override
-    public int readInt() throws JMSException {
-        if (!readMode) throw new MessageNotReadableException("Message not readable");
-        try {
-            return readOnlyMessage.readInt();
-        } catch (IOException eof){
-            MessageEOFException eofEx = new MessageEOFException("eof ?");
-            eofEx.setLinkedException(eof);
-            throw eofEx;
-        }
-    }
-
-    @Override
-    public long readLong() throws JMSException {
-        if (!readMode) throw new MessageNotReadableException("Message not readable");
-        try {
-            return readOnlyMessage.readLong();
-        } catch (IOException eof){
-            MessageEOFException eofEx = new MessageEOFException("eof ?");
-            eofEx.setLinkedException(eof);
-            throw eofEx;
-        }
-    }
-
-    @Override
-    public float readFloat() throws JMSException {
-        if (!readMode) throw new MessageNotReadableException("Message not readable");
-        try {
-            return readOnlyMessage.readFloat();
-        } catch (IOException eof){
-            MessageEOFException eofEx = new MessageEOFException("eof ?");
-            eofEx.setLinkedException(eof);
-            throw eofEx;
-        }
-    }
-
-    @Override
-    public double readDouble() throws JMSException {
-        if (!readMode) throw new MessageNotReadableException("Message not readable");
-        try {
-            return readOnlyMessage.readDouble();
-        } catch (IOException eof){
-            MessageEOFException eofEx = new MessageEOFException("eof ?");
-            eofEx.setLinkedException(eof);
-            throw eofEx;
-        }
-    }
-
-    @Override
-    public String readUTF() throws JMSException {
-        if (!readMode) throw new MessageNotReadableException("Message not readable");
-        try {
-            return readOnlyMessage.readUTF();
-        } catch (IOException eof){
-            MessageEOFException eofEx = new MessageEOFException("eof ?");
-            eofEx.setLinkedException(eof);
-            throw eofEx;
-        }
-    }
-
-    @Override
-    public int readBytes(byte[] data) throws JMSException {
-        if (!readMode) throw new MessageNotReadableException("Message not readable");
-        try {
-            return readOnlyMessage.readBytes(data);
-        } catch (IOException eof){
-            MessageEOFException eofEx = new MessageEOFException("eof ?");
-            eofEx.setLinkedException(eof);
-            throw eofEx;
-        }
-    }
-
-    @Override
-    public int readBytes(byte[] data, int length) throws JMSException {
-        if (!readMode) throw new MessageNotReadableException("Message not readable");
-        try {
-            return readOnlyMessage.readBytes(data, length);
-        } catch (IOException eof){
-            MessageEOFException eofEx = new MessageEOFException("eof ?");
-            eofEx.setLinkedException(eof);
-            throw eofEx;
-        }
-    }
-
-    @Override
-    public void writeBoolean(boolean val) throws JMSException {
-        if (readMode) throw new MessageNotWriteableException("Message not writable");
-        try {
-            writeOnlyMessage.writeBoolean(val);
-        } catch (IOException ioEx){
-            MessageEOFException eofEx = new MessageEOFException("Unexpected ioex : " + ioEx);
-            eofEx.setLinkedException(ioEx);
-            throw eofEx;
-        }
-    }
-
-    @Override
-    public void writeByte(byte val) throws JMSException {
-        if (readMode) throw new MessageNotWriteableException("Message not writable");
-        try {
-            writeOnlyMessage.writeByte(val);
-        } catch (IOException ioEx){
-            MessageEOFException eofEx = new MessageEOFException("Unexpected ioex : " + ioEx);
-            eofEx.setLinkedException(ioEx);
-            throw eofEx;
-        }
-    }
-
-    @Override
-    public void writeShort(short val) throws JMSException {
-        if (readMode) throw new MessageNotWriteableException("Message not writable");
-        try {
-            writeOnlyMessage.writeShort(val);
-        } catch (IOException ioEx){
-            MessageEOFException eofEx = new MessageEOFException("Unexpected ioex : " + ioEx);
-            eofEx.setLinkedException(ioEx);
-            throw eofEx;
-        }
-    }
-
-    @Override
-    public void writeChar(char val) throws JMSException {
-        if (readMode) throw new MessageNotWriteableException("Message not writable");
-        try {
-            writeOnlyMessage.writeChar(val);
-        } catch (IOException ioEx){
-            MessageEOFException eofEx = new MessageEOFException("Unexpected ioex : " + ioEx);
-            eofEx.setLinkedException(ioEx);
-            throw eofEx;
-        }
-    }
-
-    @Override
-    public void writeInt(int val) throws JMSException {
-        if (readMode) throw new MessageNotWriteableException("Message not writable");
-        try {
-            writeOnlyMessage.writeInt(val);
-        } catch (IOException ioEx){
-            MessageEOFException eofEx = new MessageEOFException("Unexpected ioex : " + ioEx);
-            eofEx.setLinkedException(ioEx);
-            throw eofEx;
-        }
-    }
-
-    @Override
-    public void writeLong(long val) throws JMSException {
-        if (readMode) throw new MessageNotWriteableException("Message not writable");
-        try {
-            writeOnlyMessage.writeLong(val);
-        } catch (IOException ioEx){
-            MessageEOFException eofEx = new MessageEOFException("Unexpected ioex : " + ioEx);
-            eofEx.setLinkedException(ioEx);
-            throw eofEx;
-        }
-    }
-
-    @Override
-    public void writeFloat(float val) throws JMSException {
-        if (readMode) throw new MessageNotWriteableException("Message not writable");
-        try {
-            writeOnlyMessage.writeFloat(val);
-        } catch (IOException ioEx){
-            MessageEOFException eofEx = new MessageEOFException("Unexpected ioex : " + ioEx);
-            eofEx.setLinkedException(ioEx);
-            throw eofEx;
-        }
-    }
-
-    @Override
-    public void writeDouble(double val) throws JMSException {
-        if (readMode) throw new MessageNotWriteableException("Message not writable");
-        try {
-            writeOnlyMessage.writeDouble(val);
-        } catch (IOException ioEx){
-            MessageEOFException eofEx = new MessageEOFException("Unexpected ioex : " + ioEx);
-            eofEx.setLinkedException(ioEx);
-            throw eofEx;
-        }
-    }
-
-    @Override
-    public void writeUTF(String val) throws JMSException {
-        if (readMode) throw new MessageNotWriteableException("Message not writable");
-        try {
-            writeOnlyMessage.writeUTF(val);
-        } catch (IOException ioEx){
-            MessageEOFException eofEx = new MessageEOFException("Unexpected ioex : " + ioEx);
-            eofEx.setLinkedException(ioEx);
-            throw eofEx;
-        }
-    }
-
-    @Override
-    public void writeBytes(byte[] data) throws JMSException {
-        if (readMode) throw new MessageNotWriteableException("Message not writable");
-        try {
-            writeOnlyMessage.writeBytes(data);
-        } catch (IOException ioEx){
-            MessageEOFException eofEx = new MessageEOFException("Unexpected ioex : " + ioEx);
-            eofEx.setLinkedException(ioEx);
-            throw eofEx;
-        }
-    }
-
-    @Override
-    public void writeBytes(byte[] data, int offset, int length) throws JMSException {
-        if (readMode) throw new MessageNotWriteableException("Message not writable");
-        try {
-            writeOnlyMessage.writeBytes(data, offset, length);
-        } catch (IOException ioEx){
-            MessageEOFException eofEx = new MessageEOFException("Unexpected ioex : " + ioEx);
-            eofEx.setLinkedException(ioEx);
-            throw eofEx;
-        }
-    }
-
-    // This method is ONLY supposed to be used for object form of primitive types !
-    @Override
-    public void writeObject(Object obj) throws JMSException {
-        if (readMode) throw new MessageNotWriteableException("Message not writable");
-        try {
-            // unrolling it
-            if (obj instanceof Boolean) {
-                writeOnlyMessage.writeBoolean((Boolean) obj);
-            }
-            else if (obj instanceof Byte) {
-                writeOnlyMessage.writeByte((Byte) obj);
-            }
-            else if (obj instanceof Short) {
-                writeOnlyMessage.writeShort((Short) obj);
-            }
-            else if (obj instanceof Character) {
-                writeOnlyMessage.writeChar((Character) obj);
-            }
-            else if (obj instanceof Integer) {
-                writeOnlyMessage.writeInt((Integer) obj);
-            }
-            else if (obj instanceof Long) {
-                writeOnlyMessage.writeLong((Long) obj);
-            }
-            else if (obj instanceof Float) {
-                writeOnlyMessage.writeFloat((Float) obj);
-            }
-            else if (obj instanceof Double) {
-                writeOnlyMessage.writeDouble((Double) obj);
-            }
-            else if (obj instanceof String) {
-                writeOnlyMessage.writeUTF((String) obj);
-            }
-            else if (obj instanceof byte[]) {
-                writeOnlyMessage.writeBytes((byte[]) obj);
-            }
-            else{
-                throw new JMSException("Unsupported type for obj : " + obj.getClass());
-            }
-        } catch (IOException ioEx){
-            MessageEOFException eofEx = new MessageEOFException("Unexpected ioex : " + ioEx);
-            eofEx.setLinkedException(ioEx);
-            throw eofEx;
-        }
-    }
-
-    @Override
-    public void reset() throws JMSException {
-        if (this.readMode) return ;
-        try {
-            this.readOnlyMessage = new ReadOnlyMessage(writeOnlyMessage.getPayloadAsBytes());
-        } catch (IOException e) {
-            JMSException ex = new JMSException("Unable to convert write-only message to read-only message .. " + e);
-            ex.setLinkedException(e);
-            throw ex;
-        }
-        this.readMode = true;
-        this.writeOnlyMessage = null;
-    }
-
-    @Override
-    public void clearBody() throws JMSException {
-        super.clearBody();
-        this.writeOnlyMessage = new WriteOnlyMessage();
-        this.readOnlyMessage = null;
-        this.readMode = false;
-    }
-
-    private byte[] getPayloadData() throws IOException {
-        if (readMode) return readOnlyMessage.getDataCopy();
-        return writeOnlyMessage.getPayloadAsBytes();
-    }
-
-    @Override
-    BytesMessageImpl createClone(SessionImpl session, String sourceTopicName, String subscriberId)
-        throws JMSException {
-        return new BytesMessageImpl(session, this, sourceTopicName, subscriberId);
-    }
-
-    // Using java object's instead of primitives to avoid having to store schema separately.
-    private static class ReadOnlyMessage {
-
-        private final DataInputStream dis;
-        private final byte[] data;
-
-        public ReadOnlyMessage(byte[] data) {
-            this.dis = new DataInputStream(new ByteArrayInputStream(data));
-            this.data = data;
-        }
-
-        public byte[] getDataCopy(){
-            byte[] retval = new byte[data.length];
-            System.arraycopy(data, 0, retval, 0, retval.length);
-            return retval;
-        }
-
-        public int getBodyLength() {
-            return data.length;
-        }
-
-        public boolean readBoolean() throws IOException {
-            return dis.readBoolean();
-        }
-
-        public byte readByte() throws IOException {
-            return dis.readByte();
-        }
-
-        public int readUnsignedByte() throws IOException {
-            return dis.readUnsignedByte();
-        }
-
-        public short readShort() throws IOException {
-            return dis.readShort();
-        }
-
-        public int readUnsignedShort() throws IOException {
-            return dis.readUnsignedShort();
-        }
-
-        public char readChar() throws IOException {
-            return dis.readChar();
-        }
-
-        public int readInt() throws IOException {
-            return dis.readInt();
-        }
-
-        public long readLong() throws IOException {
-            return dis.readLong();
-        }
-
-        public float readFloat() throws IOException {
-            return dis.readFloat();
-        }
-
-        public double readDouble() throws IOException {
-            return dis.readDouble();
-        }
-
-        public String readUTF() throws IOException {
-            return dis.readUTF();
-        }
-
-        public int readBytes(byte[] data) throws IOException {
-            return dis.read(data);
-        }
-
-        public int readBytes(byte[] data, int length) throws IOException {
-            if (length < 0 || length > data.length)
-              throw new IndexOutOfBoundsException("Invalid length specified : " + length + ", data : " + data.length);
-            return dis.read(data, 0, length);
-        }
-    }
-
-    private static class WriteOnlyMessage {
-
-        private final ByteArrayOutputStream baos;
-        private final DataOutputStream dos;
-
-        public WriteOnlyMessage(){
-            baos = new ByteArrayOutputStream();
-            dos = new DataOutputStream(baos);
-        }
-
-        public WriteOnlyMessage(byte[] data) throws IOException {
-            baos = new ByteArrayOutputStream();
-            dos = new DataOutputStream(baos);
-            dos.write(data);
-        }
-
-        public byte[] getPayloadAsBytes() throws IOException {
-            dos.flush();
-            return baos.toByteArray();
-        }
-
-        public void writeBoolean(boolean val) throws IOException {
-            dos.writeBoolean(val);
-        }
-
-        public void writeByte(byte val) throws IOException {
-            dos.writeByte(val);
-        }
-
-        public void writeShort(short val) throws IOException {
-            dos.writeShort(val);
-        }
-
-        public void writeChar(char val) throws IOException {
-            dos.writeChar(val);
-        }
-
-        public void writeInt(int val) throws IOException {
-            dos.writeInt(val);
-        }
-
-        public void writeLong(long val) throws IOException {
-            dos.writeLong(val);
-        }
-
-        public void writeFloat(float val) throws IOException {
-            dos.writeFloat(val);
-        }
-
-        public void writeDouble(double val) throws IOException {
-            dos.writeDouble(val);
-        }
-
-        public void writeUTF(String val) throws IOException {
-            dos.writeUTF(val);
-        }
-
-        public void writeBytes(byte[] data) throws IOException {
-            dos.write(data);
-        }
-
-        public void writeBytes(byte[] data, int offset, int length) throws IOException {
-            dos.write(data, offset, length);
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/main/java/org/apache/hedwig/jms/message/MapMessageImpl.java
----------------------------------------------------------------------
diff --git a/hedwig-client-jms/src/main/java/org/apache/hedwig/jms/message/MapMessageImpl.java b/hedwig-client-jms/src/main/java/org/apache/hedwig/jms/message/MapMessageImpl.java
deleted file mode 100644
index af806fb..0000000
--- a/hedwig-client-jms/src/main/java/org/apache/hedwig/jms/message/MapMessageImpl.java
+++ /dev/null
@@ -1,295 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.jms.message;
-
-import com.google.protobuf.ByteString;
-import org.apache.hedwig.jms.SessionImpl;
-import org.apache.hedwig.protocol.PubSubProtocol;
-
-import javax.jms.JMSException;
-import javax.jms.MapMessage;
-import javax.jms.Message;
-import javax.jms.MessageNotWriteableException;
-import java.io.IOException;
-import java.util.Collections;
-import java.util.Enumeration;
-import java.util.LinkedHashMap;
-import java.util.Map;
-
-/**
- * There is a weaker expectation of ordering and strong expectation of &lt;key, value&gt; container for data.
- */
-public class MapMessageImpl extends MessageImpl implements MapMessage {
-    private final Map<String, Object> payload = new LinkedHashMap<String, Object>(4);
-    private boolean readMode;
-
-    public MapMessageImpl(SessionImpl session) throws JMSException {
-        super(session);
-        clearBody();
-    }
-
-    public MapMessageImpl(SessionImpl session, MapMessageImpl message, String sourceTopicName,
-                          String subscriberId) throws JMSException {
-        super(session, (MessageImpl) message, sourceTopicName, subscriberId);
-        this.payload.putAll(message.payload);
-        this.readMode = message.readMode;
-    }
-
-    // To clone a message from a MapMessage which is NOT MapMessageImpl
-    // Changing order of parameter to NOT accidentally clash with the constructor above.
-    // This is midly confusing, but helps a lot in preventing accidental bugs !
-    public MapMessageImpl(MapMessage message, SessionImpl session) throws JMSException {
-        super((Message) message, session);
-
-        if (message instanceof MapMessageImpl) {
-            throw new JMSException("Coding bug - should use this constructor ONLY for non MapMessageImpl messages");
-        }
-
-
-        Enumeration keys = message.getMapNames();
-        while (keys.hasMoreElements()){
-            Object key = keys.nextElement();
-            if (!(key instanceof String))
-              throw new JMSException("Unsupported type (expected String) for key : " + key);
-
-            String skey = (String) key;
-            this.payload.put(skey, message.getObject(skey));
-        }
-        this.readMode = false;
-    }
-
-    @SuppressWarnings("unchecked")
-    public MapMessageImpl(SessionImpl session, PubSubProtocol.Message message,
-                          Map<String, Object> properties, String sourceTopicName, String subscriberId,
-                          Runnable ackRunnable) throws JMSException {
-        super(session, message, properties, sourceTopicName, subscriberId, ackRunnable);
-        try {
-            this.payload.putAll((Map<String, Object>) MessageUtil.bytesToObject(message.getBody().toByteArray()));
-        } catch (IOException e) {
-            JMSException ex = new JMSException("Unable to read message data .. " + e);
-            ex.setLinkedException(e);
-            throw ex;
-        }
-        this.readMode = true;
-    }
-
-    @Override
-    protected MessageUtil.SupportedMessageTypes  getJmsMessageType() {
-        return MessageUtil.SupportedMessageTypes.MAP;
-    }
-
-    protected boolean isBodyEmpty(){
-        return false;
-    }
-
-    @Override
-    public PubSubProtocol.Message generateHedwigMessage() throws JMSException {
-        PubSubProtocol.Message.Builder builder = PubSubProtocol.Message.newBuilder();
-        super.populateBuilderWithHeaders(builder);
-
-        // Now set body and type.
-        try {
-            builder.setBody(ByteString.copyFrom(MessageUtil.objectToBytes(this.payload)));
-        } catch (IOException e) {
-            JMSException ex = new JMSException("Unable to read message data .. " + e);
-            ex.setLinkedException(e);
-            throw ex;
-        }
-
-        return builder.build();
-    }
-
-    @Override
-    public boolean getBoolean(String name) throws JMSException {
-        return MessageUtil.asBoolean(payload.get(name));
-    }
-
-    @Override
-    public byte getByte(String name) throws JMSException {
-        return MessageUtil.asByte(payload.get(name));
-    }
-
-    @Override
-    public short getShort(String name) throws JMSException {
-        return MessageUtil.asShort(payload.get(name));
-    }
-
-    @Override
-    public char getChar(String name) throws JMSException {
-        return MessageUtil.asChar(payload.get(name));
-    }
-
-    @Override
-    public int getInt(String name) throws JMSException {
-        return MessageUtil.asInteger(payload.get(name));
-    }
-
-    @Override
-    public long getLong(String name) throws JMSException {
-        return MessageUtil.asLong(payload.get(name));
-    }
-
-    @Override
-    public float getFloat(String name) throws JMSException {
-        return MessageUtil.asFloat(payload.get(name));
-    }
-
-    @Override
-    public double getDouble(String name) throws JMSException {
-        return MessageUtil.asDouble(payload.get(name));
-    }
-
-    @Override
-    public String getString(String name) throws JMSException {
-        return MessageUtil.asString(payload.get(name));
-    }
-
-    @Override
-    public byte[] getBytes(String name) throws JMSException {
-        return MessageUtil.asBytes(payload.get(name));
-    }
-
-    @Override
-    public Object getObject(String name) throws JMSException {
-        return payload.get(name);
-    }
-
-    @Override
-    public Enumeration getMapNames() throws JMSException {
-        return Collections.enumeration(payload.keySet());
-    }
-
-    @Override
-    public void setBoolean(String name, boolean value) throws JMSException {
-        if (readMode) throw new MessageNotWriteableException("Message not writable");
-        if (!MessageUtil.isValidKey(name)) throw new IllegalArgumentException("Invalid key " + name);
-        payload.put(name, value);
-    }
-
-    @Override
-    public void setByte(String name, byte value) throws JMSException {
-        if (readMode) throw new MessageNotWriteableException("Message not writable");
-        if (!MessageUtil.isValidKey(name)) throw new IllegalArgumentException("Invalid key " + name);
-        payload.put(name, value);
-
-    }
-
-    @Override
-    public void setShort(String name, short value) throws JMSException {
-        if (readMode) throw new MessageNotWriteableException("Message not writable");
-        if (!MessageUtil.isValidKey(name)) throw new IllegalArgumentException("Invalid key " + name);
-        payload.put(name, value);
-
-    }
-
-    @Override
-    public void setChar(String name, char value) throws JMSException {
-        if (readMode) throw new MessageNotWriteableException("Message not writable");
-        if (!MessageUtil.isValidKey(name)) throw new IllegalArgumentException("Invalid key " + name);
-        payload.put(name, value);
-
-    }
-
-    @Override
-    public void setInt(String name, int value) throws JMSException {
-        if (readMode) throw new MessageNotWriteableException("Message not writable");
-        if (!MessageUtil.isValidKey(name)) throw new IllegalArgumentException("Invalid key " + name);
-        payload.put(name, value);
-
-    }
-
-    @Override
-    public void setLong(String name, long value) throws JMSException {
-        if (readMode) throw new MessageNotWriteableException("Message not writable");
-        if (!MessageUtil.isValidKey(name)) throw new IllegalArgumentException("Invalid key " + name);
-        payload.put(name, value);
-
-    }
-
-    @Override
-    public void setFloat(String name, float value) throws JMSException {
-        if (readMode) throw new MessageNotWriteableException("Message not writable");
-        if (!MessageUtil.isValidKey(name)) throw new IllegalArgumentException("Invalid key " + name);
-        payload.put(name, value);
-
-    }
-
-    @Override
-    public void setDouble(String name, double value) throws JMSException {
-        if (readMode) throw new MessageNotWriteableException("Message not writable");
-        if (!MessageUtil.isValidKey(name)) throw new IllegalArgumentException("Invalid key " + name);
-        payload.put(name, value);
-
-    }
-
-    @Override
-    public void setString(String name, String value) throws JMSException {
-        if (readMode) throw new MessageNotWriteableException("Message not writable");
-        if (!MessageUtil.isValidKey(name)) throw new IllegalArgumentException("Invalid key " + name);
-        payload.put(name, value);
-
-    }
-
-    @Override
-    public void setBytes(String name, byte[] value) throws JMSException {
-        if (readMode) throw new MessageNotWriteableException("Message not writable");
-        if (!MessageUtil.isValidKey(name)) throw new IllegalArgumentException("Invalid key " + name);
-        payload.put(name, value);
-
-    }
-
-    @Override
-    public void setBytes(String name, byte[] value, int i, int i1) throws JMSException {
-        if (readMode) throw new MessageNotWriteableException("Message not writable");
-        if (!MessageUtil.isValidKey(name)) throw new IllegalArgumentException("Invalid key " + name);
-        payload.put(name, value);
-
-    }
-
-    @Override
-    public void setObject(String name, Object value) throws JMSException {
-        if (readMode) throw new MessageNotWriteableException("Message not writable");
-        if (!MessageUtil.isValidKey(name)) throw new IllegalArgumentException("Invalid key " + name);
-        payload.put(name, value);
-
-    }
-
-    @Override
-    public boolean itemExists(String name) throws JMSException {
-        return payload.containsKey(name);
-    }
-
-    @Override
-    public void clearBody() throws JMSException {
-        super.clearBody();
-        // allow read and write.
-        this.payload.clear();
-        this.readMode = false;
-    }
-
-    @Override
-    public void reset() throws JMSException {
-        if (this.readMode) return ;
-        this.readMode = true;
-    }
-
-    @Override
-    MapMessageImpl createClone(SessionImpl session, String sourceTopicName, String subscriberId) throws JMSException {
-        return new MapMessageImpl(session, this, sourceTopicName, subscriberId);
-    }
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/main/java/org/apache/hedwig/jms/message/MessageImpl.java
----------------------------------------------------------------------
diff --git a/hedwig-client-jms/src/main/java/org/apache/hedwig/jms/message/MessageImpl.java b/hedwig-client-jms/src/main/java/org/apache/hedwig/jms/message/MessageImpl.java
deleted file mode 100644
index d13feb0..0000000
--- a/hedwig-client-jms/src/main/java/org/apache/hedwig/jms/message/MessageImpl.java
+++ /dev/null
@@ -1,872 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.jms.message;
-
-import com.google.protobuf.ByteString;
-import org.apache.hedwig.jms.SessionImpl;
-import org.apache.hedwig.jms.message.header.MetadataProcessor;
-import org.apache.hedwig.jms.selector.SelectorEvaluationException;
-import org.apache.hedwig.protocol.PubSubProtocol;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.jms.DeliveryMode;
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageFormatException;
-import javax.jms.MessageNotWriteableException;
-import java.util.Collections;
-import java.util.Enumeration;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Set;
-
-/**
- * Implementation of a message.
- */
-public class MessageImpl implements Message {
-
-    // This is of type byte for now - enough ?
-    public static final String JMS_MESSAGE_TYPE_KEY = "jmsBodyType";
-    // 'others' (non-jms clients) can depend on this boolean metadata property : for now, part
-    // of jms values directly due to how metadata is being designed !
-    // sigh :-(
-    public static final String EMPTY_BODY_KEY = "bodyEmpty";
-
-
-    private final static Logger logger = LoggerFactory.getLogger(MessageImpl.class);
-
-    public static final String JMS_MESSAGE_ID = "JMSMessageID";
-    public static final String JMS_TIMESTAMP = "JMSTimestamp";
-    public static final String JMS_CORRELATION_ID = "JMSCorrelationID";
-    public static final String JMS_REPLY_TO = "JMSReplyTo";
-    public static final String JMS_DESTINATION = "JMSDestination";
-    public static final String JMS_DELIVERY_MODE = "JMSDeliveryMode";
-    public static final String JMS_REDELIVERED = "JMSRedelivered";
-    public static final String JMS_TYPE = "JMSType";
-    public static final String JMS_EXPIRATION = "JMSExpiration";
-    public static final String JMS_PRIORITY = "JMSPriority";
-
-    private static final Set<String> standardProperties;
-    static {
-        Set<String> set = new HashSet<String>(16);
-        set.add(JMS_MESSAGE_ID);
-        set.add(JMS_TIMESTAMP);
-        set.add(JMS_CORRELATION_ID);
-
-        set.add(JMS_REPLY_TO);
-        set.add(JMS_DESTINATION);
-        set.add(JMS_DELIVERY_MODE);
-
-        // Currently simulated in provider - NOT from hedwig.
-        set.add(JMS_REDELIVERED);
-        set.add(JMS_TYPE);
-
-        set.add(JMS_EXPIRATION);
-        set.add(JMS_PRIORITY);
-
-        standardProperties = Collections.unmodifiableSet(set);
-    }
-
-    private final SessionImpl session;
-    private final String serverJmsMessageId;
-
-    private String jmsMessageId;
-    private long jmsTimestamp = 0;
-    private String jmsCorrelationID;
-
-    private Destination jmsReplyTo;
-    private Destination jmsDestination;
-    private int jmsDeliveryMode = DeliveryMode.PERSISTENT;
-
-    private boolean jmsRedelivered;
-    private String jmsType;
-
-    private long jmsExpiration = 0L;
-    private int jmsPriority = Message.DEFAULT_PRIORITY;
-
-    // Note: this DOES NOT contain standard headers - which are explicitly handled.
-    private boolean propertiesReadOnly = false;
-    protected Map<String, Object> properties = new HashMap<String, Object>(4);
-
-    // key == standard property.
-    private Set<String> standardPropertiesExists = new HashSet<String>(16);
-    private Set<String> standardPropertiesExistsForWire = new HashSet<String>(16);
-
-    private final String sourceName;
-    private final String subscriberId;
-
-    private final Runnable ackRunnable;
-
-    // This is to be set to true ONLY for testing - NOT otherwise !
-    // The JMS api DOES NOT expose this ...
-    // private boolean allowSpecifyJMSMessageIDForTest;
-
-    //private final PubSubProtocol.Message rawMessage;
-
-    public MessageImpl(SessionImpl session){
-        this.session = session;
-
-        this.sourceName = null;
-        this.subscriberId = null;
-        this.ackRunnable = null;
-        this.serverJmsMessageId = null;
-        // this.rawMessage = null;
-    }
-
-    MessageImpl(SessionImpl session, MessageImpl message, String sourceName, String subscriberId)
-        throws JMSException {
-        this.session = session;
-        this.sourceName = sourceName;
-        this.subscriberId = subscriberId;
-        this.ackRunnable = message.getAckRunnable();
-        this.serverJmsMessageId = message.getServerJmsMessageId();
-        // this.rawMessage = null;
-
-        // Copy all properties from message to this class.
-
-        this.properties.putAll(message.properties);
-
-        // Now copy rest of the state over ...
-        if (message.propertyExists(JMS_MESSAGE_ID)) setJMSMessageIDInternal(message.getJMSMessageID());
-        if (message.propertyExists(JMS_TIMESTAMP)) setJMSTimestamp(message.getJMSTimestamp());
-        if (message.propertyExists(JMS_CORRELATION_ID)) setJMSCorrelationID(message.getJMSCorrelationID());
-        // We do not support this right now.
-        // if (message.propertyExists(JMS_CORRELATION_ID_AS_BYTES))
-        //    setJMSCorrelationIDAsBytes(message.getJMSCorrelationIDAsBytes());
-        if (message.propertyExists(JMS_REPLY_TO)) setJMSReplyTo(message.getJMSReplyTo());
-        if (message.propertyExists(JMS_DESTINATION)) setJMSDestination(message.getJMSDestination());
-        if (message.propertyExists(JMS_DELIVERY_MODE)) setJMSDeliveryMode(message.getJMSDeliveryMode());
-        if (message.propertyExists(JMS_REDELIVERED)) setJMSRedelivered(message.getJMSRedelivered());
-        if (message.propertyExists(JMS_TYPE)) setJMSType(message.getJMSType());
-        if (message.propertyExists(JMS_EXPIRATION)) setJMSExpiration(message.getJMSExpiration());
-        if (message.propertyExists(JMS_PRIORITY)) setJMSPriority(message.getJMSPriority());
-
-        this.propertiesReadOnly = message.propertiesReadOnly;
-    }
-
-    // To clone a message from a Message which is NOT MessageImpl
-    // Changing order of parameter to NOT accidentally clash with the constructor above.
-    // This is midly confusing, but helps a lot in preventing accidental bugs !
-    MessageImpl(Message message, SessionImpl session) throws JMSException {
-        this.session = session;
-        this.sourceName = null;
-        this.subscriberId = null;
-        this.ackRunnable = null;
-        this.serverJmsMessageId = null;
-        // this.rawMessage = null;
-
-        assert (! (message instanceof MessageImpl ));
-
-        // Copy all properties from message to this class.
-        Enumeration names = message.getPropertyNames();
-        while (names.hasMoreElements()){
-            Object name = names.nextElement();
-            if (!(name instanceof String))
-              throw new JMSException("Unsupported type (expected String) for key : " + name);
-
-            String sname = (String) name;
-            this.properties.put(sname, message.getObjectProperty(sname));
-        }
-
-        // Now copy rest of the state over ...
-        // JMS VIOLATION: we will be unable to check for propertyExists after this,
-        //  at sender and receiver side ... sigh :-(
-        setJMSMessageIDInternal(message.getJMSMessageID());
-        setJMSTimestamp(message.getJMSTimestamp());
-        setJMSCorrelationID(message.getJMSCorrelationID());
-        // We do not support this right now.
-        // setJMSCorrelationIDAsBytes(message.getJMSCorrelationIDAsBytes());
-        setJMSReplyTo(message.getJMSReplyTo());
-        setJMSDestination(message.getJMSDestination());
-        setJMSDeliveryMode(message.getJMSDeliveryMode());
-        setJMSRedelivered(message.getJMSRedelivered());
-        setJMSType(message.getJMSType());
-        setJMSExpiration(message.getJMSExpiration());
-        setJMSPriority(message.getJMSPriority());
-
-        // Should be able to modify, right ?
-        this.propertiesReadOnly = false;
-
-        // remove all jms standard keys from properties now : this should ideally result in zero
-        // removals ... but we never know with client code !
-        for (String key : standardProperties) properties.remove(key);
-    }
-
-    MessageImpl(SessionImpl session, PubSubProtocol.Message message, Map<String, Object> properties,
-                String sourceName, String subscriberId, Runnable ackRunnable) throws JMSException {
-        this.session = session;
-        this.sourceName = sourceName;
-        this.subscriberId = subscriberId;
-        this.ackRunnable = ackRunnable;
-        // this.rawMessage = message;
-
-        // setJMSMessageID(getStringProperty(properties, JMS_MESSAGE_ID));
-        setJMSMessageIDInternal(MessageUtil.generateJMSMessageIdFromSeqId(message.getMsgId()));
-        this.serverJmsMessageId = getJMSMessageID();
-
-        if (properties.containsKey(JMS_TIMESTAMP)) setJMSTimestamp(getLongProperty(properties, JMS_TIMESTAMP));
-        if (properties.containsKey(JMS_CORRELATION_ID)) setJMSCorrelationID(
-            getStringProperty(properties, JMS_CORRELATION_ID));
-        if (null != getStringProperty(properties, JMS_REPLY_TO)) {
-            setJMSReplyTo(
-                    session.getDestination(session.findDestinationType(getStringProperty(properties, JMS_REPLY_TO)),
-                        getStringProperty(properties, JMS_REPLY_TO)
-                    ));
-        }
-        if (null != getStringProperty(properties, JMS_DESTINATION)) {
-            setJMSDestination(
-                    session.getDestination(session.findDestinationType(
-                        getStringProperty(properties, JMS_DESTINATION)),
-                        getStringProperty(properties, JMS_DESTINATION)
-                    ));
-        }
-
-        if (properties.containsKey(JMS_DELIVERY_MODE)) setJMSDeliveryMode(
-            getIntProperty(properties, JMS_DELIVERY_MODE));
-        if (properties.containsKey(JMS_TYPE)) setJMSType(getStringProperty(properties, JMS_TYPE));
-
-        if (properties.containsKey(JMS_EXPIRATION)) setJMSExpiration(
-            getLongProperty(properties, JMS_EXPIRATION));
-        if (properties.containsKey(JMS_PRIORITY)) setJMSPriority(
-            getIntProperty(properties, JMS_PRIORITY));
-
-
-        // remove all jms standard keys from properties now : this should result in zero removals ...
-        // but adding anyway.
-        for (String key : standardProperties) properties.remove(key);
-
-        // Immutable after reading from stream !
-        this.propertiesReadOnly = true;
-        this.properties.putAll(properties);
-    }
-
-    protected MessageUtil.SupportedMessageTypes getJmsMessageType(){
-        // Validate against coding bug ... this MUST be overridden in all subclasses.
-        if (getClass() != MessageImpl.class)
-          throw new IllegalStateException("This method must be overridden by subclasses. class : " + getClass());
-        return MessageUtil.SupportedMessageTypes.ONLY_MESSAGE;
-    }
-
-    public PubSubProtocol.Message generateHedwigMessage() throws JMSException {
-        // This is to be called ONLY from the base class - all children MUST override it and NOT delegate to it.
-        if (getClass() != MessageImpl.class) {
-            throw new JMSException("Unexpected to call MessageImpl's generateHedwigMessage from subclass " +
-                getClass());
-        }
-
-        PubSubProtocol.Message.Builder builder = PubSubProtocol.Message.newBuilder();
-        populateBuilderWithHeaders(builder);
-        // no body - will be appropriately set in populateBuilderWithHeaders().
-        return builder.build();
-    }
-
-    protected boolean isBodyEmpty(){
-        return true;
-    }
-
-    /*
-    protected void markEmptyBody(PubSubProtocol.Message.Builder builder) {
-        MetadataProcessor.addBooleanProperty(builder, EMPTY_BODY_KEY, true);
-        builder.setBody(ByteString.EMPTY);
-    }
-    */
-
-    protected boolean hasBodyFromProperties() {
-        // if key missing (common case), then there is body.
-        if (!properties.containsKey(EMPTY_BODY_KEY)) return true;
-        // If present, then check if it is a boolean of value true.
-        Object value = properties.get(EMPTY_BODY_KEY);
-
-        // special case null.
-        if (null == value) return true;
-        if (value instanceof Boolean) return ! (Boolean) value;
-
-        // unknown type ...
-        logger.info("Unknown type for value of " + EMPTY_BODY_KEY + " in message properties : " + value);
-        // assume true by default.
-        return true;
-    }
-
-
-    protected final void populateBuilderWithHeaders(PubSubProtocol.Message.Builder builder) throws JMSException {
-
-        Map<String, Object> propertiesCopy = new HashMap<String, Object>(properties);
-        if (isBodyEmpty()) {
-            propertiesCopy.put(EMPTY_BODY_KEY, true);
-            builder.setBody(ByteString.EMPTY);
-        }
-        // Not setting unless required to reduce message size - change this ?
-        // else propertiesCopy.put(EMPTY_BODY_KEY, false);
-
-        Iterator<Map.Entry<String, Object>> iter = propertiesCopy.entrySet().iterator();
-        while (iter.hasNext()){
-            Map.Entry<String, Object> entry = iter.next();
-            if (standardProperties.contains(entry.getKey())) {
-                if (logger.isInfoEnabled())
-                  logger.info("Ignoring user attempt to set standard property as application property : " + entry);
-                iter.remove();
-            }
-        }
-
-
-        // set jms message type.
-        propertiesCopy.put(JMS_MESSAGE_TYPE_KEY, getJmsMessageType().getType());
-        if (standardPropertiesExistsForWire.contains(JMS_CORRELATION_ID))
-          propertiesCopy.put(JMS_CORRELATION_ID, getJMSCorrelationID());
-
-        // unsupported for now.
-        // if (standardPropertiesExistsForWire.contains(JMS_CORRELATION_ID_AS_BYTES))
-        //    propertiesCopy.put(JMS_CORRELATION_ID_AS_BYTES, getJMSCorrelationIDAsBytes());
-        if (standardPropertiesExistsForWire.contains(JMS_DELIVERY_MODE))
-          propertiesCopy.put(JMS_DELIVERY_MODE, getJMSDeliveryMode());
-
-        if (standardPropertiesExistsForWire.contains(JMS_DESTINATION))
-          propertiesCopy.put(JMS_DESTINATION, session.toName(getJMSDestination()));
-        if (standardPropertiesExistsForWire.contains(JMS_EXPIRATION))
-          propertiesCopy.put(JMS_EXPIRATION, getJMSExpiration());
-
-        // This can be set by client - but we ignore it in hedwig.
-        // if (standardPropertiesExistsForWire.contains(JMS_MESSAGE_ID))
-        //    propertiesCopy.put(JMS_MESSAGE_ID, getJMSMessageID());
-
-        // We do not support priority - but we are gong to allow it to be specified : this is
-        // for selectors to set conditions on it !
-        if (standardPropertiesExistsForWire.contains(JMS_PRIORITY))
-          propertiesCopy.put(JMS_PRIORITY, getJMSPriority());
-
-        // this is not to be sent to hedwig.
-        // if (standardPropertiesExistsForWire.contains(JMS_REDELIVERED))
-        //    propertiesCopy.put(JMS_REDELIVERED, getJMSRedelivered());
-
-        if (standardPropertiesExistsForWire.contains(JMS_REPLY_TO))
-          propertiesCopy.put(JMS_REPLY_TO, session.toName(getJMSReplyTo()));
-
-
-        propertiesCopy.put(JMS_TIMESTAMP, getJMSTimestamp());
-        if (standardPropertiesExistsForWire.contains(JMS_TYPE)) propertiesCopy.put(JMS_TYPE, getJMSType());
-
-
-        MetadataProcessor.addHeaders(builder, propertiesCopy);
-    }
-
-    @Override
-    public String getJMSMessageID() {
-        return jmsMessageId;
-    }
-
-    @Override
-    public void setJMSMessageID(String jmsMessageId) throws JMSException {
-        // JMS VIOLATION ... we are NOT allowing client to override jms message-id.
-        // if (!allowSpecifyJMSMessageIDForTest)
-        //    throw new JMSException("We do not allow setting jms message id. This will be ignored by hedwig anyway.");
-        if (logger.isDebugEnabled()) logger.debug("Setting this is irrelevant - we override it anyway - " +
-            " hedwig does not allow specifying it explictly.");
-        setJMSMessageIDInternal(jmsMessageId);
-    }
-
-    public void setJMSMessageIDInternal(String jmsMessageId) throws JMSException {
-        this.jmsMessageId = jmsMessageId;
-        if (null != jmsMessageId){
-            // We do not allow sending the property over wire.
-            this.standardPropertiesExists.add(JMS_MESSAGE_ID);
-            // this.standardPropertiesExistsForWire.add(JMS_MESSAGE_ID);
-        }
-        else {
-            this.standardPropertiesExists.remove(JMS_MESSAGE_ID);
-            // this.standardPropertiesExistsForWire.remove(JMS_MESSAGE_ID);
-        }
-    }
-
-    // The immutable message Id set by the server.
-    public String getServerJmsMessageId() {
-        return serverJmsMessageId;
-    }
-
-    @Override
-    public long getJMSTimestamp() {
-        return jmsTimestamp;
-    }
-
-    @Override
-    public void setJMSTimestamp(long jmsTimestamp) {
-        this.jmsTimestamp = jmsTimestamp;
-        this.standardPropertiesExists.add(JMS_TIMESTAMP);
-        // this.standardPropertiesExistsForWire.add(JMS_TIMESTAMP);
-    }
-
-    @Override
-    public byte[] getJMSCorrelationIDAsBytes() {
-        throw new UnsupportedOperationException("unsupported");
-    }
-
-    @Override
-    public void setJMSCorrelationIDAsBytes(byte[] bytes) {
-        throw new UnsupportedOperationException("unsupported");
-    }
-
-    @Override
-    public void setJMSCorrelationID(String jmsCorrelationID) {
-        this.jmsCorrelationID = jmsCorrelationID;
-        if (null != jmsCorrelationID){
-            this.standardPropertiesExists.add(JMS_CORRELATION_ID);
-            this.standardPropertiesExistsForWire.add(JMS_CORRELATION_ID);
-        }
-        else {
-            this.standardPropertiesExists.remove(JMS_CORRELATION_ID);
-            this.standardPropertiesExistsForWire.remove(JMS_CORRELATION_ID);
-        }
-    }
-
-    @Override
-    public String getJMSCorrelationID() {
-        return jmsCorrelationID;
-    }
-
-    @Override
-    public Destination getJMSReplyTo() {
-        return jmsReplyTo;
-    }
-
-    @Override
-    public void setJMSReplyTo(Destination jmsReplyTo) {
-        this.jmsReplyTo = jmsReplyTo;
-        if (null != jmsReplyTo){
-            this.standardPropertiesExists.add(JMS_REPLY_TO);
-            this.standardPropertiesExistsForWire.add(JMS_REPLY_TO);
-        }
-        else {
-            this.standardPropertiesExists.remove(JMS_REPLY_TO);
-            this.standardPropertiesExistsForWire.remove(JMS_REPLY_TO);
-        }
-    }
-
-    @Override
-    public Destination getJMSDestination() {
-        return jmsDestination;
-    }
-
-    @Override
-    public void setJMSDestination(Destination jmsDestination) {
-        this.jmsDestination = jmsDestination;
-        if (null != jmsDestination){
-            this.standardPropertiesExists.add(JMS_DESTINATION);
-            this.standardPropertiesExistsForWire.add(JMS_DESTINATION);
-        }
-        else {
-            this.standardPropertiesExists.remove(JMS_DESTINATION);
-            this.standardPropertiesExistsForWire.remove(JMS_DESTINATION);
-        }
-    }
-
-    @Override
-    public int getJMSDeliveryMode() {
-        return jmsDeliveryMode;
-    }
-
-    @Override
-    public void setJMSDeliveryMode(int jmsDeliveryMode) {
-        this.jmsDeliveryMode = jmsDeliveryMode;
-        this.standardPropertiesExists.add(JMS_DELIVERY_MODE);
-        this.standardPropertiesExistsForWire.add(JMS_DELIVERY_MODE);
-    }
-
-    @Override
-    public boolean getJMSRedelivered() {
-        return jmsRedelivered;
-    }
-
-    @Override
-    public void setJMSRedelivered(boolean jmsRedelivered) {
-        this.jmsRedelivered = jmsRedelivered;
-        this.standardPropertiesExists.add(JMS_REDELIVERED);
-        // this.standardPropertiesExistsForWire.add(JMS_REDELIVERED);
-    }
-
-    @Override
-    public String getJMSType() {
-        return jmsType;
-    }
-
-    @Override
-    public void setJMSType(String jmsType) {
-        this.jmsType = jmsType;
-        if (null != jmsType){
-            this.standardPropertiesExists.add(JMS_TYPE);
-            this.standardPropertiesExistsForWire.add(JMS_TYPE);
-        }
-        else {
-            this.standardPropertiesExists.remove(JMS_TYPE);
-            this.standardPropertiesExistsForWire.remove(JMS_TYPE);
-        }
-    }
-
-    @Override
-    public long getJMSExpiration() {
-        return jmsExpiration;
-    }
-
-    @Override
-    public void setJMSExpiration(long jmsExpiration) {
-        // We simulate it now !
-        // if (logger.isInfoEnabled()) logger.info("JMSExpiration is not supported right now by Hedwig ...");
-        this.jmsExpiration = jmsExpiration;
-
-        if (0 != jmsExpiration){
-            this.standardPropertiesExists.add(JMS_EXPIRATION);
-            this.standardPropertiesExistsForWire.add(JMS_EXPIRATION);
-        }
-        else {
-            this.standardPropertiesExists.remove(JMS_EXPIRATION);
-            this.standardPropertiesExistsForWire.remove(JMS_EXPIRATION);
-        }
-    }
-
-    @Override
-    public int getJMSPriority() {
-        return jmsPriority;
-    }
-
-    @Override
-    public void setJMSPriority(int jmsPriority) {
-        this.jmsPriority = jmsPriority;
-        this.standardPropertiesExists.add(JMS_PRIORITY);
-        // Sent over wire ?
-        this.standardPropertiesExistsForWire.add(JMS_PRIORITY);
-    }
-
-    @Override
-    public void clearProperties() {
-        this.propertiesReadOnly = false;
-        properties.clear();
-    }
-
-    /**
-     * JMS VIOLATION ? The spec & javadoc is unclear as to whether this method must include jms
-     * standard properties or not.
-     * But going by javadoc of
-     * @see #getPropertyNames() , we have this specified :
-     * "Note that JMS standard header fields are not considered properties and are not returned
-     * in this enumeration."
-     * Which indicates this method must not include standard properties.
-     */
-    @Override
-    public boolean propertyExists(String key) {
-        if (!standardProperties.contains(key)) return properties.containsKey(key);
-
-        // Evaluate depending on type of property.
-        return standardPropertiesExists.contains(key);
-    }
-
-    @Override
-    public boolean getBooleanProperty(String key) throws JMSException {
-        checkIfStandardProperty(key);
-        return getBooleanProperty(properties, key);
-    }
-
-    private boolean getBooleanProperty(Map<String, Object> properties, String key) throws JMSException {
-        return MessageUtil.asBoolean(properties.get(key));
-    }
-
-    @Override
-    public byte getByteProperty(String key) throws JMSException {
-        checkIfStandardProperty(key);
-        return getByteProperty(properties, key);
-    }
-
-    private byte getByteProperty(Map<String, Object> properties, String key) throws JMSException {
-        return MessageUtil.asByte(properties.get(key));
-    }
-
-    @Override
-    public short getShortProperty(String key) throws JMSException {
-        checkIfStandardProperty(key);
-        return getShortProperty(properties, key);
-    }
-
-    private short getShortProperty(Map<String, Object> properties, String key) throws JMSException {
-        return MessageUtil.asShort(properties.get(key));
-    }
-
-    @Override
-    public int getIntProperty(String key) throws JMSException {
-        checkIfStandardProperty(key);
-        return getIntProperty(properties, key);
-    }
-
-    private int getIntProperty(Map<String, Object> properties, String key) throws JMSException {
-        return MessageUtil.asInteger(properties.get(key));
-    }
-
-    @Override
-    public long getLongProperty(String key) throws JMSException {
-        checkIfStandardProperty(key);
-        return getLongProperty(properties, key);
-    }
-
-    private long getLongProperty(Map<String, Object> properties, String key) throws JMSException {
-        return MessageUtil.asLong(properties.get(key));
-    }
-
-    @Override
-    public float getFloatProperty(String key) throws JMSException {
-        checkIfStandardProperty(key);
-        return getFloatProperty(properties, key);
-    }
-
-    private float getFloatProperty(Map<String, Object> properties, String key) throws JMSException {
-        return MessageUtil.asFloat(properties.get(key));
-    }
-
-    @Override
-    public double getDoubleProperty(String key) throws JMSException {
-        checkIfStandardProperty(key);
-        return getDoubleProperty(properties, key);
-    }
-
-    private double getDoubleProperty(Map<String, Object> properties, String key) throws JMSException {
-        return MessageUtil.asDouble(properties.get(key));
-    }
-
-    public Object getSelectorProcessingPropertyValue(String key) throws SelectorEvaluationException {
-        if (properties.containsKey(key)) return properties.get(key);
-        if (! standardProperties.contains(key)) return null;
-
-        if (JMS_MESSAGE_ID.equals(key)) return getJMSMessageID();
-        if (JMS_TIMESTAMP.equals(key)) return getJMSTimestamp();
-        if (JMS_CORRELATION_ID.equals(key)) return getJMSCorrelationID();
-        // We do not support this right now.
-        // if (JMS_CORRELATION_ID_AS_BYTES.equals(key)) return getJMSCorrelationIDAsBytes();
-        if (JMS_REPLY_TO.equals(key)) return getJMSReplyTo();
-        if (JMS_DESTINATION.equals(key)) return getJMSDestination();
-        if (JMS_DELIVERY_MODE.equals(key)) {
-            // 3.8.1.3 Special Notes "When used in a message selector JMSDeliveryMode is treated as having the
-            // values ‘PERSISTENT’ and ‘NON_PERSISTENT’."
-            final int deliveryMode = getJMSDeliveryMode();
-            if (DeliveryMode.PERSISTENT == deliveryMode) return "PERSISTENT";
-            if (DeliveryMode.NON_PERSISTENT == deliveryMode) return "NON_PERSISTENT";
-            // unknown !
-            if (logger.isInfoEnabled()) logger.info("Unknown delivery mode specified ... " + deliveryMode);
-            return null;
-        }
-        if (JMS_REDELIVERED.equals(key)) return getJMSRedelivered();
-        if (JMS_TYPE.equals(key)) return getJMSType();
-        if (JMS_EXPIRATION.equals(key)) return getJMSExpiration();
-        if (JMS_PRIORITY.equals(key)) return getJMSPriority();
-
-        throw new SelectorEvaluationException("Unable to retrieve value for key : '" + key + "'");
-    }
-
-    @Override
-    public String getStringProperty(String key) throws JMSException {
-        checkIfStandardProperty(key);
-        return getStringProperty(properties, key);
-    }
-
-    private String getStringProperty(Map<String, Object> properties, String key) throws JMSException {
-        return MessageUtil.asString(properties.get(key));
-    }
-
-    @Override
-    public Object getObjectProperty(String key) throws JMSException {
-        checkIfStandardProperty(key);
-        // if (!propertyExists(key)) return null;
-
-        return properties.get(key);
-    }
-
-    @Override
-    public Enumeration<String> getPropertyNames() throws JMSException {
-        return Collections.enumeration(properties.keySet());
-    }
-
-    @Override
-    public void setBooleanProperty(String key, boolean value) throws JMSException {
-        if (!MessageUtil.isValidKey(key)) throw new IllegalArgumentException("Invalid key " + key);
-        if (propertiesReadOnly)
-          throw new MessageNotWriteableException("Message not writable. attempt to set property " +
-              key + " = " + value);
-        checkIfStandardProperty(key);
-        properties.put(key, value);
-    }
-
-    @Override
-    public void setByteProperty(String key, byte value) throws JMSException {
-        if (!MessageUtil.isValidKey(key)) throw new IllegalArgumentException("Invalid key " + key);
-        if (propertiesReadOnly)
-          throw new MessageNotWriteableException("Message not writable. attempt to set property " +
-              key + " = " + value);
-        checkIfStandardProperty(key);
-        properties.put(key, value);
-    }
-
-    @Override
-    public void setShortProperty(String key, short value) throws JMSException {
-        if (!MessageUtil.isValidKey(key)) throw new IllegalArgumentException("Invalid key " + key);
-        if (propertiesReadOnly)
-          throw new MessageNotWriteableException("Message not writable. attempt to set property " +
-              key + " = " + value);
-        checkIfStandardProperty(key);
-        properties.put(key, value);
-    }
-
-    @Override
-    public void setIntProperty(String key, int value) throws JMSException {
-        if (!MessageUtil.isValidKey(key)) throw new IllegalArgumentException("Invalid key " + key);
-        if (propertiesReadOnly)
-          throw new MessageNotWriteableException("Message not writable. attempt to set property " +
-              key + " = " + value);
-        checkIfStandardProperty(key);
-        properties.put(key, value);
-    }
-
-    @Override
-    public void setLongProperty(String key, long value) throws JMSException {
-        if (!MessageUtil.isValidKey(key)) throw new IllegalArgumentException("Invalid key " + key);
-        if (propertiesReadOnly)
-          throw new MessageNotWriteableException("Message not writable. attempt to set property " +
-              key + " = " + value);
-        checkIfStandardProperty(key);
-        properties.put(key, value);
-    }
-
-    @Override
-    public void setFloatProperty(String key, float value) throws JMSException {
-        if (!MessageUtil.isValidKey(key)) throw new IllegalArgumentException("Invalid key " + key);
-        if (propertiesReadOnly)
-          throw new MessageNotWriteableException("Message not writable. attempt to set property " +
-              key + " = " + value);
-        checkIfStandardProperty(key);
-        properties.put(key, value);
-    }
-
-    @Override
-    public void setDoubleProperty(String key, double value) throws JMSException {
-        if (!MessageUtil.isValidKey(key)) throw new IllegalArgumentException("Invalid key " + key);
-        if (propertiesReadOnly)
-          throw new MessageNotWriteableException("Message not writable. attempt to set property " +
-              key + " = " + value);
-        checkIfStandardProperty(key);
-        properties.put(key, value);
-    }
-
-    @Override
-    public void setStringProperty(String key, String value) throws JMSException {
-        if (!MessageUtil.isValidKey(key)) throw new IllegalArgumentException("Invalid key " + key);
-        if (propertiesReadOnly)
-          throw new MessageNotWriteableException("Message not writable. attempt to set property " +
-              key + " = " + value);
-        checkIfStandardProperty(key);
-        properties.put(key, value);
-    }
-
-    @Override
-    public void setObjectProperty(String key, Object value) throws JMSException {
-        if (!MessageUtil.isValidKey(key)) throw new IllegalArgumentException("Invalid key " + key);
-        if (propertiesReadOnly)
-          throw new MessageNotWriteableException("Message not writable. attempt to set property " +
-              key + " = " + value);
-        checkIfStandardProperty(key);
-
-        if (null == value ||
-                value instanceof Boolean ||
-                value instanceof Byte ||
-                value instanceof Short ||
-                value instanceof Integer ||
-                value instanceof Long ||
-                value instanceof Float ||
-                value instanceof Double ||
-                value instanceof byte[] ||
-                value instanceof String) {
-            properties.put(key, value);
-            return ;        }
-
-        throw new MessageFormatException("Unsupported type for value " + value.getClass());
-    }
-
-    // JMS VIOLATION ?
-    // I am not sure if getting and setting standard properties is allowed via the generic
-    // get/set methods : the spec seems unclear on it.
-    // Some javadocs seem to indicate it is NOT allowed. Hence this check ...
-    // If it is allowed in JMS - to support it, we will need to have a if/else block within each set/get
-    // which delegates to corresponding jms header set/get ...
-    private void checkIfStandardProperty(String key) throws JMSException {
-        if (standardProperties.contains(key))
-          throw new JMSException("Cannot get/set standard JMS properties using *Property api");
-    }
-
-    @Override
-    public void acknowledge() throws JMSException {
-        session.acknowledge(this);
-    }
-
-    @Override
-    public void clearBody() throws JMSException {
-        // Clear the body of the message.
-    }
-
-    public String getSourceName() {
-        return sourceName;
-    }
-
-    public String getSubscriberId() {
-        return subscriberId;
-    }
-
-    MessageImpl createClone(SessionImpl session, String sourceTopicName, String subscriberId) throws JMSException {
-        if (MessageImpl.class != getClass()) {
-            throw new JMSException("Unexpected to call MessageImpl's createClone from subclass " + getClass());
-        }
-        return new MessageImpl(session, this, sourceTopicName, subscriberId);
-    }
-
-    @Override
-    public String toString() {
-        final StringBuilder sb = new StringBuilder();
-        sb.append("MessageImpl");
-        sb.append("{session=").append(session);
-        sb.append(", jmsMessageId='").append(jmsMessageId).append('\'');
-        sb.append(", jmsTimestamp=").append(jmsTimestamp);
-        sb.append(", jmsCorrelationID='").append(jmsCorrelationID).append('\'');
-        sb.append(", jmsReplyTo=").append(jmsReplyTo);
-        sb.append(", jmsDestination=").append(jmsDestination);
-        sb.append(", jmsDeliveryMode=").append(jmsDeliveryMode);
-        sb.append(", jmsRedelivered=").append(jmsRedelivered);
-        sb.append(", jmsType='").append(jmsType).append('\'');
-        sb.append(", jmsExpiration=").append(jmsExpiration);
-        sb.append(", jmsPriority=").append(jmsPriority);
-        sb.append(", properties=").append(properties);
-        sb.append(", standardPropertiesExists=").append(standardPropertiesExists);
-        sb.append(", standardPropertiesExistsForWire=").append(standardPropertiesExistsForWire);
-        sb.append(", sourceName='").append(sourceName).append('\'');
-        sb.append(", subscriberId='").append(subscriberId).append('\'');
-        sb.append('}');
-        return sb.toString();
-    }
-
-    void reset() throws JMSException {
-        // noop ... children will override to do needful.
-    }
-
-    public Runnable getAckRunnable() {
-        return ackRunnable;
-    }
-}


Mime
View raw message