commons-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From t.@apache.org
Subject svn commit: r1570993 [5/24] - in /commons/proper/jcs/trunk/src: assembly/ changes/ conf/ java/org/apache/commons/jcs/access/ java/org/apache/commons/jcs/access/exception/ java/org/apache/commons/jcs/admin/ java/org/apache/commons/jcs/auxiliary/ java/or...
Date Sun, 23 Feb 2014 10:37:52 GMT
Modified: commons/proper/jcs/trunk/src/java/org/apache/commons/jcs/auxiliary/remote/AbstractRemoteAuxiliaryCache.java
URL: http://svn.apache.org/viewvc/commons/proper/jcs/trunk/src/java/org/apache/commons/jcs/auxiliary/remote/AbstractRemoteAuxiliaryCache.java?rev=1570993&r1=1570992&r2=1570993&view=diff
==============================================================================
--- commons/proper/jcs/trunk/src/java/org/apache/commons/jcs/auxiliary/remote/AbstractRemoteAuxiliaryCache.java (original)
+++ commons/proper/jcs/trunk/src/java/org/apache/commons/jcs/auxiliary/remote/AbstractRemoteAuxiliaryCache.java Sun Feb 23 10:37:48 2014
@@ -1,731 +1,731 @@
-package org.apache.commons.jcs.auxiliary.remote;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-import org.apache.commons.jcs.auxiliary.AbstractAuxiliaryCacheEventLogging;
-import org.apache.commons.jcs.auxiliary.AuxiliaryCacheAttributes;
-import org.apache.commons.jcs.auxiliary.remote.behavior.IRemoteCacheAttributes;
-import org.apache.commons.jcs.auxiliary.remote.behavior.IRemoteCacheClient;
-import org.apache.commons.jcs.auxiliary.remote.behavior.IRemoteCacheListener;
-import org.apache.commons.jcs.auxiliary.remote.server.behavior.RemoteType;
-import org.apache.commons.jcs.engine.CacheStatus;
-import org.apache.commons.jcs.engine.ZombieCacheServiceNonLocal;
-import org.apache.commons.jcs.engine.behavior.ICacheElement;
-import org.apache.commons.jcs.engine.behavior.ICacheElementSerialized;
-import org.apache.commons.jcs.engine.behavior.ICacheServiceNonLocal;
-import org.apache.commons.jcs.engine.behavior.IZombie;
-import org.apache.commons.jcs.engine.logging.behavior.ICacheEventLogger;
-import org.apache.commons.jcs.engine.stats.StatElement;
-import org.apache.commons.jcs.engine.stats.Stats;
-import org.apache.commons.jcs.engine.stats.behavior.IStatElement;
-import org.apache.commons.jcs.engine.stats.behavior.IStats;
-import org.apache.commons.jcs.utils.serialization.SerializationConversionUtil;
-import org.apache.commons.jcs.utils.threadpool.ThreadPoolManager;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-/** Abstract base for remote caches. I'm trying to break out and reuse common functionality. */
-public abstract class AbstractRemoteAuxiliaryCache<K extends Serializable, V extends Serializable>
-    extends AbstractAuxiliaryCacheEventLogging<K, V>
-    implements IRemoteCacheClient<K, V>
-{
-    /** Don't change. */
-    private static final long serialVersionUID = -5329231850422826461L;
-
-    /** The logger. */
-    private final static Log log = LogFactory.getLog( AbstractRemoteAuxiliaryCache.class );
-
-    /**
-     * This does the work. In an RMI instances, it will be a remote reference. In an http remote
-     * cache it will be an http client. In zombie mode it is replaced with a balking facade.
-     */
-    private ICacheServiceNonLocal<K, V> remoteCacheService;
-
-    /** The cacheName */
-    protected final String cacheName;
-
-    /** The listener. This can be null. */
-    private IRemoteCacheListener<K, V> remoteCacheListener;
-
-    /** The configuration values. TODO, we'll need a base here. */
-    private IRemoteCacheAttributes remoteCacheAttributes;
-
-    /** A thread pool for gets if configured. */
-    private ThreadPoolExecutor pool = null;
-
-    /** Should we get asynchronously using a pool. */
-    private boolean usePoolForGet = false;
-
-    /**
-     * Creates the base.
-     * <p>
-     * @param cattr
-     * @param remote
-     * @param listener
-     */
-    public AbstractRemoteAuxiliaryCache( IRemoteCacheAttributes cattr, ICacheServiceNonLocal<K, V> remote,
-                                         IRemoteCacheListener<K, V> listener )
-    {
-        this.setRemoteCacheAttributes( cattr );
-        this.cacheName = cattr.getCacheName();
-        this.setRemoteCacheService( remote );
-        this.setRemoteCacheListener( listener );
-
-        if ( log.isDebugEnabled() )
-        {
-            log.debug( "Construct> cacheName=" + cattr.getCacheName() );
-            log.debug( "irca = " + getRemoteCacheAttributes() );
-            log.debug( "remote = " + remote );
-            log.debug( "listener = " + listener );
-        }
-
-        // use a pool if it is greater than 0
-        if ( log.isDebugEnabled() )
-        {
-            log.debug( "GetTimeoutMillis() = " + getRemoteCacheAttributes().getGetTimeoutMillis() );
-        }
-
-        if ( getRemoteCacheAttributes().getGetTimeoutMillis() > 0 )
-        {
-            pool = ThreadPoolManager.getInstance().getPool( getRemoteCacheAttributes().getThreadPoolName() );
-            if ( log.isDebugEnabled() )
-            {
-                log.debug( "Thread Pool = " + pool );
-            }
-            if ( pool != null )
-            {
-                usePoolForGet = true;
-            }
-        }
-    }
-
-    /**
-     * Synchronously dispose the remote cache; if failed, replace the remote handle with a zombie.
-     * <p>
-     * @throws IOException
-     */
-    @Override
-    protected void processDispose()
-        throws IOException
-    {
-        if ( log.isInfoEnabled() )
-        {
-            log.info( "Disposing of remote cache." );
-        }
-        try
-        {
-            if ( getRemoteCacheListener() != null )
-            {
-                getRemoteCacheListener().dispose();
-            }
-        }
-        catch ( Exception ex )
-        {
-            log.error( "Couldn't dispose", ex );
-            handleException( ex, "Failed to dispose [" + cacheName + "]", ICacheEventLogger.DISPOSE_EVENT );
-        }
-    }
-
-    /**
-     * Synchronously get from the remote cache; if failed, replace the remote handle with a zombie.
-     * <p>
-     * Use threadpool to timeout if a value is set for GetTimeoutMillis
-     * <p>
-     * If we are a cluster client, we need to leave the Element in its serialized form. Cluster
-     * clients cannot deserialize objects. Cluster clients get ICacheElementSerialized objects from
-     * other remote servers.
-     * <p>
-     * @param key
-     * @return ICacheElement, a wrapper around the key, value, and attributes
-     * @throws IOException
-     */
-    @Override
-    protected ICacheElement<K, V> processGet( K key )
-        throws IOException
-    {
-        ICacheElement<K, V> retVal = null;
-        try
-        {
-            if ( usePoolForGet )
-            {
-                retVal = getUsingPool( key );
-            }
-            else
-            {
-                retVal = getRemoteCacheService().get( cacheName, key, getListenerId() );
-            }
-
-            // Eventually the instance of will not be necessary.
-            if ( retVal != null && retVal instanceof ICacheElementSerialized )
-            {
-                // Never try to deserialize if you are a cluster client. Cluster
-                // clients are merely intra-remote cache communicators. Remote caches are assumed
-                // to have no ability to deserialize the objects.
-                if ( this.getRemoteCacheAttributes().getRemoteType() != RemoteType.CLUSTER )
-                {
-                    retVal = SerializationConversionUtil.getDeSerializedCacheElement( (ICacheElementSerialized<K, V>) retVal,
-                                                                                      this.elementSerializer );
-                }
-            }
-        }
-        catch ( Exception ex )
-        {
-            handleException( ex, "Failed to get [" + key + "] from [" + cacheName + "]", ICacheEventLogger.GET_EVENT );
-        }
-        return retVal;
-    }
-
-    /**
-     * This allows gets to timeout in case of remote server machine shutdown.
-     * <p>
-     * @param key
-     * @return ICacheElement
-     * @throws IOException
-     */
-    public ICacheElement<K, V> getUsingPool( final K key )
-        throws IOException
-    {
-        int timeout = getRemoteCacheAttributes().getGetTimeoutMillis();
-
-        try
-        {
-            Callable<ICacheElement<K, V>> command = new Callable<ICacheElement<K, V>>()
-            {
-                public ICacheElement<K, V> call()
-                    throws IOException
-                {
-                    return getRemoteCacheService().get( cacheName, key, getListenerId() );
-                }
-            };
-
-            // execute using the pool
-            Future<ICacheElement<K, V>> future = pool.submit(command);
-
-            // used timed get in order to timeout
-            ICacheElement<K, V> ice = future.get(timeout, TimeUnit.MILLISECONDS);
-
-            if ( log.isDebugEnabled() )
-            {
-                if ( ice == null )
-                {
-                    log.debug( "nothing found in remote cache" );
-                }
-                else
-                {
-                    log.debug( "found item in remote cache" );
-                }
-            }
-            return ice;
-        }
-        catch ( TimeoutException te )
-        {
-            log.warn( "TimeoutException, Get Request timed out after " + timeout );
-            throw new IOException( "Get Request timed out after " + timeout );
-        }
-        catch ( InterruptedException ex )
-        {
-            log.warn( "InterruptedException, Get Request timed out after " + timeout );
-            throw new IOException( "Get Request timed out after " + timeout );
-        }
-        catch (ExecutionException ex)
-        {
-            // assume that this is an IOException thrown by the callable.
-            log.error( "ExecutionException, Assuming an IO exception thrown in the background.", ex );
-            throw new IOException( "Get Request timed out after " + timeout );
-        }
-    }
-
-    /**
-     * Calls get matching on the server. Each entry in the result is unwrapped.
-     * <p>
-     * @param pattern
-     * @return Map
-     * @throws IOException
-     */
-    @Override
-    public Map<K, ICacheElement<K, V>> processGetMatching( String pattern )
-        throws IOException
-    {
-        Map<K, ICacheElement<K, V>> results = new HashMap<K, ICacheElement<K, V>>();
-        try
-        {
-            Map<K, ICacheElement<K, V>> rawResults = getRemoteCacheService().getMatching( cacheName, pattern, getListenerId() );
-
-            // Eventually the instance of will not be necessary.
-            if ( rawResults != null )
-            {
-                for (Map.Entry<K, ICacheElement<K, V>> entry : rawResults.entrySet())
-                {
-                    ICacheElement<K, V> unwrappedResult = null;
-                    if ( entry.getValue() instanceof ICacheElementSerialized )
-                    {
-                        // Never try to deserialize if you are a cluster client. Cluster
-                        // clients are merely intra-remote cache communicators. Remote caches are assumed
-                        // to have no ability to deserialize the objects.
-                        if ( this.getRemoteCacheAttributes().getRemoteType() != RemoteType.CLUSTER )
-                        {
-                            unwrappedResult = SerializationConversionUtil
-                                .getDeSerializedCacheElement( (ICacheElementSerialized<K, V>) entry.getValue(),
-                                                              this.elementSerializer );
-                        }
-                    }
-                    else
-                    {
-                        unwrappedResult = entry.getValue();
-                    }
-                    results.put( entry.getKey(), unwrappedResult );
-                }
-            }
-        }
-        catch ( Exception ex )
-        {
-            handleException( ex, "Failed to getMatching [" + pattern + "] from [" + cacheName + "]",
-                             ICacheEventLogger.GET_EVENT );
-        }
-        return results;
-    }
-
-    /**
-     * Gets multiple items from the cache based on the given set of keys.
-     * <p>
-     * @param keys
-     * @return a map of K key to ICacheElement<K, V> element, or an empty map if there is no
-     *         data in cache for any of these keys
-     * @throws IOException
-     */
-    @Override
-    protected Map<K, ICacheElement<K, V>> processGetMultiple( Set<K> keys )
-        throws IOException
-    {
-        Map<K, ICacheElement<K, V>> elements = new HashMap<K, ICacheElement<K, V>>();
-        if ( keys != null && !keys.isEmpty() )
-        {
-            for (K key : keys)
-            {
-                ICacheElement<K, V> element = get( key );
-
-                if ( element != null )
-                {
-                    elements.put( key, element );
-                }
-            }
-        }
-        return elements;
-    }
-
-    /**
-     * Synchronously remove from the remote cache; if failed, replace the remote handle with a
-     * zombie.
-     * <p>
-     * @param key
-     * @return boolean, whether or not the item was removed
-     * @throws IOException
-     */
-    @Override
-    protected boolean processRemove( K key )
-        throws IOException
-    {
-        if ( !this.getRemoteCacheAttributes().getGetOnly() )
-        {
-            if ( log.isDebugEnabled() )
-            {
-                log.debug( "remove> key=" + key );
-            }
-            try
-            {
-                getRemoteCacheService().remove( cacheName, key, getListenerId() );
-            }
-            catch ( Exception ex )
-            {
-                handleException( ex, "Failed to remove " + key + " from " + cacheName, ICacheEventLogger.REMOVE_EVENT );
-            }
-            return true;
-        }
-        return false;
-    }
-
-    /**
-     * Synchronously removeAll from the remote cache; if failed, replace the remote handle with a
-     * zombie.
-     * <p>
-     * @throws IOException
-     */
-    @Override
-    protected void processRemoveAll()
-        throws IOException
-    {
-        if ( !this.getRemoteCacheAttributes().getGetOnly() )
-        {
-            try
-            {
-                getRemoteCacheService().removeAll( cacheName, getListenerId() );
-            }
-            catch ( Exception ex )
-            {
-                handleException( ex, "Failed to remove all from " + cacheName, ICacheEventLogger.REMOVEALL_EVENT );
-            }
-        }
-    }
-
-    /**
-     * Serializes the object and then calls update on the remote server with the byte array. The
-     * byte array is wrapped in a ICacheElementSerialized. This allows the remote server to operate
-     * without any knowledge of caches classes.
-     * <p>
-     * @param ce
-     * @throws IOException
-     */
-    @Override
-    protected void processUpdate( ICacheElement<K, V> ce )
-        throws IOException
-    {
-        if ( !getRemoteCacheAttributes().getGetOnly() )
-        {
-            ICacheElementSerialized<K, V> serialized = null;
-            try
-            {
-                if ( log.isDebugEnabled() )
-                {
-                    log.debug( "sending item to remote server" );
-                }
-
-                // convert so we don't have to know about the object on the
-                // other end.
-                serialized = SerializationConversionUtil.getSerializedCacheElement( ce, this.elementSerializer );
-
-                remoteCacheService.update( serialized, getListenerId() );
-            }
-            catch ( NullPointerException npe )
-            {
-                log.error( "npe for ce = " + ce + "ce.attr = " + ce.getElementAttributes(), npe );
-            }
-            catch ( Exception ex )
-            {
-                // event queue will wait and retry
-                handleException( ex, "Failed to put [" + ce.getKey() + "] to " + ce.getCacheName(),
-                                 ICacheEventLogger.UPDATE_EVENT );
-            }
-        }
-        else
-        {
-            if ( log.isDebugEnabled() )
-            {
-                log.debug( "get only mode, not sending to remote server" );
-            }
-        }
-    }
-
-    /**
-     * Return the keys in this cache.
-     * <p>
-     * @see org.apache.commons.jcs.auxiliary.AuxiliaryCache#getKeySet()
-     */
-    public Set<K> getKeySet()
-        throws java.rmi.RemoteException, IOException
-    {
-        return getRemoteCacheService().getKeySet(cacheName);
-    }
-
-    /**
-     * Allows other member of this package to access the listener. This is mainly needed for
-     * deregistering a listener.
-     * <p>
-     * @return IRemoteCacheListener, the listener for this remote server
-     */
-    public IRemoteCacheListener<K, V> getListener()
-    {
-        return getRemoteCacheListener();
-    }
-
-    /**
-     * let the remote cache set a listener_id. Since there is only one listener for all the regions
-     * and every region gets registered? the id shouldn't be set if it isn't zero. If it is we
-     * assume that it is a reconnect.
-     * <p>
-     * @param id The new listenerId value
-     */
-    public void setListenerId( long id )
-    {
-        if ( getRemoteCacheListener() != null )
-        {
-            try
-            {
-                getRemoteCacheListener().setListenerId( id );
-
-                if ( log.isDebugEnabled() )
-                {
-                    log.debug( "set listenerId = " + id );
-                }
-            }
-            catch ( Exception e )
-            {
-                log.error( "Problem setting listenerId", e );
-            }
-        }
-    }
-
-    /**
-     * Gets the listenerId attribute of the RemoteCacheListener object
-     * <p>
-     * @return The listenerId value
-     */
-    public long getListenerId()
-    {
-        if ( getRemoteCacheListener() != null )
-        {
-            try
-            {
-                if ( log.isDebugEnabled() )
-                {
-                    log.debug( "get listenerId = " + getRemoteCacheListener().getListenerId() );
-                }
-                return getRemoteCacheListener().getListenerId();
-            }
-            catch ( Exception e )
-            {
-                log.error( "Problem getting listenerId", e );
-            }
-        }
-        return -1;
-    }
-
-    /**
-     * Returns the current cache size.
-     * @return The size value
-     */
-    public int getSize()
-    {
-        return 0;
-    }
-
-    /**
-     * Custom exception handling some children.  This should be used to initiate failover.
-     * <p>
-     * @param ex
-     * @param msg
-     * @param eventName
-     * @throws IOException
-     */
-    protected abstract void handleException( Exception ex, String msg, String eventName )
-        throws IOException;
-
-    /**
-     * Gets the stats attribute of the RemoteCache object.
-     * <p>
-     * @return The stats value
-     */
-    public String getStats()
-    {
-        return getStatistics().toString();
-    }
-
-    /**
-     * @return IStats object
-     */
-    public IStats getStatistics()
-    {
-        IStats stats = new Stats();
-        stats.setTypeName( "AbstractRemoteAuxiliaryCache" );
-
-        ArrayList<IStatElement> elems = new ArrayList<IStatElement>();
-
-        IStatElement se = null;
-
-        se = new StatElement();
-        se.setName( "Remote Type" );
-        se.setData( this.getRemoteCacheAttributes().getRemoteTypeName() + "" );
-        elems.add( se );
-
-        if ( this.getRemoteCacheAttributes().getRemoteType() == RemoteType.CLUSTER )
-        {
-            // something cluster specific
-        }
-
-        // no data gathered here
-
-        se = new StatElement();
-        se.setName( "UsePoolForGet" );
-        se.setData( "" + usePoolForGet );
-        elems.add( se );
-
-        if ( pool != null )
-        {
-            se = new StatElement();
-            se.setName( "Pool Size" );
-            se.setData( "" + pool.getPoolSize() );
-            elems.add( se );
-
-            se = new StatElement();
-            se.setName( "Maximum Pool Size" );
-            se.setData( "" + pool.getMaximumPoolSize() );
-            elems.add( se );
-        }
-
-        if ( getRemoteCacheService() instanceof ZombieCacheServiceNonLocal )
-        {
-            se = new StatElement();
-            se.setName( "Zombie Queue Size" );
-            se.setData( "" + ( (ZombieCacheServiceNonLocal<K, V>) getRemoteCacheService() ).getQueueSize() );
-            elems.add( se );
-        }
-
-        // get an array and put them in the Stats object
-        IStatElement[] ses = elems.toArray( new StatElement[0] );
-        stats.setStatElements( ses );
-
-        return stats;
-    }
-
-    /**
-     * Returns the cache status. An error status indicates the remote connection is not available.
-     * <p>
-     * @return The status value
-     */
-    public CacheStatus getStatus()
-    {
-        return getRemoteCacheService() instanceof IZombie ? CacheStatus.ERROR : CacheStatus.ALIVE;
-    }
-
-    /**
-     * Replaces the current remote cache service handle with the given handle. If the current remote
-     * is a Zombie, then it propagates any events that are queued to the restored service.
-     * <p>
-     * @param restoredRemote ICacheServiceNonLocal -- the remote server or proxy to the remote server
-     */
-    public void fixCache( ICacheServiceNonLocal<?, ?> restoredRemote )
-    {
-        @SuppressWarnings("unchecked") // Don't know how to do this properly
-        ICacheServiceNonLocal<K, V> remote = (ICacheServiceNonLocal<K, V>)restoredRemote;
-        if ( getRemoteCacheService() != null && getRemoteCacheService() instanceof ZombieCacheServiceNonLocal )
-        {
-            ZombieCacheServiceNonLocal<K, V> zombie = (ZombieCacheServiceNonLocal<K, V>) getRemoteCacheService();
-            setRemoteCacheService( remote );
-            try
-            {
-                zombie.propagateEvents( remote );
-            }
-            catch ( Exception e )
-            {
-                try
-                {
-                    handleException( e, "Problem propagating events from Zombie Queue to new Remote Service.",
-                                     "fixCache" );
-                }
-                catch ( IOException e1 )
-                {
-                    // swallow, since this is just expected kick back.  Handle always throws
-                }
-            }
-        }
-        else
-        {
-            setRemoteCacheService( remote );
-        }
-    }
-
-
-    /**
-     * Gets the cacheType attribute of the RemoteCache object
-     * @return The cacheType value
-     */
-    public CacheType getCacheType()
-    {
-        return CacheType.REMOTE_CACHE;
-    }
-
-    /**
-     * Gets the cacheName attribute of the RemoteCache object.
-     * <p>
-     * @return The cacheName value
-     */
-    public String getCacheName()
-    {
-        return cacheName;
-    }
-
-    /**
-     * @param remote the remote to set
-     */
-    protected void setRemoteCacheService( ICacheServiceNonLocal<K, V> remote )
-    {
-        this.remoteCacheService = remote;
-    }
-
-    /**
-     * @return the remote
-     */
-    protected ICacheServiceNonLocal<K, V> getRemoteCacheService()
-    {
-        return remoteCacheService;
-    }
-
-    /**
-     * @return Returns the AuxiliaryCacheAttributes.
-     */
-    public AuxiliaryCacheAttributes getAuxiliaryCacheAttributes()
-    {
-        return getRemoteCacheAttributes();
-    }
-
-    /**
-     * @param remoteCacheAttributes the remoteCacheAttributes to set
-     */
-    protected void setRemoteCacheAttributes( IRemoteCacheAttributes remoteCacheAttributes )
-    {
-        this.remoteCacheAttributes = remoteCacheAttributes;
-    }
-
-    /**
-     * @return the remoteCacheAttributes
-     */
-    protected IRemoteCacheAttributes getRemoteCacheAttributes()
-    {
-        return remoteCacheAttributes;
-    }
-
-    /**
-     * @param remoteCacheListener the remoteCacheListener to set
-     */
-    protected void setRemoteCacheListener( IRemoteCacheListener<K, V> remoteCacheListener )
-    {
-        this.remoteCacheListener = remoteCacheListener;
-    }
-
-    /**
-     * @return the remoteCacheListener
-     */
-    protected IRemoteCacheListener<K, V> getRemoteCacheListener()
-    {
-        return remoteCacheListener;
-    }
-}
+package org.apache.commons.jcs.auxiliary.remote;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.commons.jcs.auxiliary.AbstractAuxiliaryCacheEventLogging;
+import org.apache.commons.jcs.auxiliary.AuxiliaryCacheAttributes;
+import org.apache.commons.jcs.auxiliary.remote.behavior.IRemoteCacheAttributes;
+import org.apache.commons.jcs.auxiliary.remote.behavior.IRemoteCacheClient;
+import org.apache.commons.jcs.auxiliary.remote.behavior.IRemoteCacheListener;
+import org.apache.commons.jcs.auxiliary.remote.server.behavior.RemoteType;
+import org.apache.commons.jcs.engine.CacheStatus;
+import org.apache.commons.jcs.engine.ZombieCacheServiceNonLocal;
+import org.apache.commons.jcs.engine.behavior.ICacheElement;
+import org.apache.commons.jcs.engine.behavior.ICacheElementSerialized;
+import org.apache.commons.jcs.engine.behavior.ICacheServiceNonLocal;
+import org.apache.commons.jcs.engine.behavior.IZombie;
+import org.apache.commons.jcs.engine.logging.behavior.ICacheEventLogger;
+import org.apache.commons.jcs.engine.stats.StatElement;
+import org.apache.commons.jcs.engine.stats.Stats;
+import org.apache.commons.jcs.engine.stats.behavior.IStatElement;
+import org.apache.commons.jcs.engine.stats.behavior.IStats;
+import org.apache.commons.jcs.utils.serialization.SerializationConversionUtil;
+import org.apache.commons.jcs.utils.threadpool.ThreadPoolManager;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/** Abstract base for remote caches. I'm trying to break out and reuse common functionality. */
+public abstract class AbstractRemoteAuxiliaryCache<K extends Serializable, V extends Serializable>
+    extends AbstractAuxiliaryCacheEventLogging<K, V>
+    implements IRemoteCacheClient<K, V>
+{
+    /** Don't change. */
+    private static final long serialVersionUID = -5329231850422826461L;
+
+    /** The logger. */
+    private final static Log log = LogFactory.getLog( AbstractRemoteAuxiliaryCache.class );
+
+    /**
+     * This does the work. In an RMI instances, it will be a remote reference. In an http remote
+     * cache it will be an http client. In zombie mode it is replaced with a balking facade.
+     */
+    private ICacheServiceNonLocal<K, V> remoteCacheService;
+
+    /** The cacheName */
+    protected final String cacheName;
+
+    /** The listener. This can be null. */
+    private IRemoteCacheListener<K, V> remoteCacheListener;
+
+    /** The configuration values. TODO, we'll need a base here. */
+    private IRemoteCacheAttributes remoteCacheAttributes;
+
+    /** A thread pool for gets if configured. */
+    private ThreadPoolExecutor pool = null;
+
+    /** Should we get asynchronously using a pool. */
+    private boolean usePoolForGet = false;
+
+    /**
+     * Creates the base.
+     * <p>
+     * @param cattr
+     * @param remote
+     * @param listener
+     */
+    public AbstractRemoteAuxiliaryCache( IRemoteCacheAttributes cattr, ICacheServiceNonLocal<K, V> remote,
+                                         IRemoteCacheListener<K, V> listener )
+    {
+        this.setRemoteCacheAttributes( cattr );
+        this.cacheName = cattr.getCacheName();
+        this.setRemoteCacheService( remote );
+        this.setRemoteCacheListener( listener );
+
+        if ( log.isDebugEnabled() )
+        {
+            log.debug( "Construct> cacheName=" + cattr.getCacheName() );
+            log.debug( "irca = " + getRemoteCacheAttributes() );
+            log.debug( "remote = " + remote );
+            log.debug( "listener = " + listener );
+        }
+
+        // use a pool if it is greater than 0
+        if ( log.isDebugEnabled() )
+        {
+            log.debug( "GetTimeoutMillis() = " + getRemoteCacheAttributes().getGetTimeoutMillis() );
+        }
+
+        if ( getRemoteCacheAttributes().getGetTimeoutMillis() > 0 )
+        {
+            pool = ThreadPoolManager.getInstance().getPool( getRemoteCacheAttributes().getThreadPoolName() );
+            if ( log.isDebugEnabled() )
+            {
+                log.debug( "Thread Pool = " + pool );
+            }
+            if ( pool != null )
+            {
+                usePoolForGet = true;
+            }
+        }
+    }
+
+    /**
+     * Synchronously dispose the remote cache; if failed, replace the remote handle with a zombie.
+     * <p>
+     * @throws IOException
+     */
+    @Override
+    protected void processDispose()
+        throws IOException
+    {
+        if ( log.isInfoEnabled() )
+        {
+            log.info( "Disposing of remote cache." );
+        }
+        try
+        {
+            if ( getRemoteCacheListener() != null )
+            {
+                getRemoteCacheListener().dispose();
+            }
+        }
+        catch ( Exception ex )
+        {
+            log.error( "Couldn't dispose", ex );
+            handleException( ex, "Failed to dispose [" + cacheName + "]", ICacheEventLogger.DISPOSE_EVENT );
+        }
+    }
+
+    /**
+     * Synchronously get from the remote cache; if failed, replace the remote handle with a zombie.
+     * <p>
+     * Use threadpool to timeout if a value is set for GetTimeoutMillis
+     * <p>
+     * If we are a cluster client, we need to leave the Element in its serialized form. Cluster
+     * clients cannot deserialize objects. Cluster clients get ICacheElementSerialized objects from
+     * other remote servers.
+     * <p>
+     * @param key
+     * @return ICacheElement, a wrapper around the key, value, and attributes
+     * @throws IOException
+     */
+    @Override
+    protected ICacheElement<K, V> processGet( K key )
+        throws IOException
+    {
+        ICacheElement<K, V> retVal = null;
+        try
+        {
+            if ( usePoolForGet )
+            {
+                retVal = getUsingPool( key );
+            }
+            else
+            {
+                retVal = getRemoteCacheService().get( cacheName, key, getListenerId() );
+            }
+
+            // Eventually the instance of will not be necessary.
+            if ( retVal != null && retVal instanceof ICacheElementSerialized )
+            {
+                // Never try to deserialize if you are a cluster client. Cluster
+                // clients are merely intra-remote cache communicators. Remote caches are assumed
+                // to have no ability to deserialize the objects.
+                if ( this.getRemoteCacheAttributes().getRemoteType() != RemoteType.CLUSTER )
+                {
+                    retVal = SerializationConversionUtil.getDeSerializedCacheElement( (ICacheElementSerialized<K, V>) retVal,
+                                                                                      this.elementSerializer );
+                }
+            }
+        }
+        catch ( Exception ex )
+        {
+            handleException( ex, "Failed to get [" + key + "] from [" + cacheName + "]", ICacheEventLogger.GET_EVENT );
+        }
+        return retVal;
+    }
+
+    /**
+     * This allows gets to timeout in case of remote server machine shutdown.
+     * <p>
+     * @param key
+     * @return ICacheElement
+     * @throws IOException
+     */
+    public ICacheElement<K, V> getUsingPool( final K key )
+        throws IOException
+    {
+        int timeout = getRemoteCacheAttributes().getGetTimeoutMillis();
+
+        try
+        {
+            Callable<ICacheElement<K, V>> command = new Callable<ICacheElement<K, V>>()
+            {
+                public ICacheElement<K, V> call()
+                    throws IOException
+                {
+                    return getRemoteCacheService().get( cacheName, key, getListenerId() );
+                }
+            };
+
+            // execute using the pool
+            Future<ICacheElement<K, V>> future = pool.submit(command);
+
+            // used timed get in order to timeout
+            ICacheElement<K, V> ice = future.get(timeout, TimeUnit.MILLISECONDS);
+
+            if ( log.isDebugEnabled() )
+            {
+                if ( ice == null )
+                {
+                    log.debug( "nothing found in remote cache" );
+                }
+                else
+                {
+                    log.debug( "found item in remote cache" );
+                }
+            }
+            return ice;
+        }
+        catch ( TimeoutException te )
+        {
+            log.warn( "TimeoutException, Get Request timed out after " + timeout );
+            throw new IOException( "Get Request timed out after " + timeout );
+        }
+        catch ( InterruptedException ex )
+        {
+            log.warn( "InterruptedException, Get Request timed out after " + timeout );
+            throw new IOException( "Get Request timed out after " + timeout );
+        }
+        catch (ExecutionException ex)
+        {
+            // assume that this is an IOException thrown by the callable.
+            log.error( "ExecutionException, Assuming an IO exception thrown in the background.", ex );
+            throw new IOException( "Get Request timed out after " + timeout );
+        }
+    }
+
+    /**
+     * Calls get matching on the server. Each entry in the result is unwrapped.
+     * <p>
+     * @param pattern
+     * @return Map
+     * @throws IOException
+     */
+    @Override
+    public Map<K, ICacheElement<K, V>> processGetMatching( String pattern )
+        throws IOException
+    {
+        Map<K, ICacheElement<K, V>> results = new HashMap<K, ICacheElement<K, V>>();
+        try
+        {
+            Map<K, ICacheElement<K, V>> rawResults = getRemoteCacheService().getMatching( cacheName, pattern, getListenerId() );
+
+            // Eventually the instance of will not be necessary.
+            if ( rawResults != null )
+            {
+                for (Map.Entry<K, ICacheElement<K, V>> entry : rawResults.entrySet())
+                {
+                    ICacheElement<K, V> unwrappedResult = null;
+                    if ( entry.getValue() instanceof ICacheElementSerialized )
+                    {
+                        // Never try to deserialize if you are a cluster client. Cluster
+                        // clients are merely intra-remote cache communicators. Remote caches are assumed
+                        // to have no ability to deserialize the objects.
+                        if ( this.getRemoteCacheAttributes().getRemoteType() != RemoteType.CLUSTER )
+                        {
+                            unwrappedResult = SerializationConversionUtil
+                                .getDeSerializedCacheElement( (ICacheElementSerialized<K, V>) entry.getValue(),
+                                                              this.elementSerializer );
+                        }
+                    }
+                    else
+                    {
+                        unwrappedResult = entry.getValue();
+                    }
+                    results.put( entry.getKey(), unwrappedResult );
+                }
+            }
+        }
+        catch ( Exception ex )
+        {
+            handleException( ex, "Failed to getMatching [" + pattern + "] from [" + cacheName + "]",
+                             ICacheEventLogger.GET_EVENT );
+        }
+        return results;
+    }
+
+    /**
+     * Gets multiple items from the cache based on the given set of keys.
+     * <p>
+     * @param keys
+     * @return a map of K key to ICacheElement<K, V> element, or an empty map if there is no
+     *         data in cache for any of these keys
+     * @throws IOException
+     */
+    @Override
+    protected Map<K, ICacheElement<K, V>> processGetMultiple( Set<K> keys )
+        throws IOException
+    {
+        Map<K, ICacheElement<K, V>> elements = new HashMap<K, ICacheElement<K, V>>();
+        if ( keys != null && !keys.isEmpty() )
+        {
+            for (K key : keys)
+            {
+                ICacheElement<K, V> element = get( key );
+
+                if ( element != null )
+                {
+                    elements.put( key, element );
+                }
+            }
+        }
+        return elements;
+    }
+
+    /**
+     * Synchronously remove from the remote cache; if failed, replace the remote handle with a
+     * zombie.
+     * <p>
+     * @param key
+     * @return boolean, whether or not the item was removed
+     * @throws IOException
+     */
+    @Override
+    protected boolean processRemove( K key )
+        throws IOException
+    {
+        if ( !this.getRemoteCacheAttributes().getGetOnly() )
+        {
+            if ( log.isDebugEnabled() )
+            {
+                log.debug( "remove> key=" + key );
+            }
+            try
+            {
+                getRemoteCacheService().remove( cacheName, key, getListenerId() );
+            }
+            catch ( Exception ex )
+            {
+                handleException( ex, "Failed to remove " + key + " from " + cacheName, ICacheEventLogger.REMOVE_EVENT );
+            }
+            return true;
+        }
+        return false;
+    }
+
+    /**
+     * Synchronously removeAll from the remote cache; if failed, replace the remote handle with a
+     * zombie.
+     * <p>
+     * @throws IOException
+     */
+    @Override
+    protected void processRemoveAll()
+        throws IOException
+    {
+        if ( !this.getRemoteCacheAttributes().getGetOnly() )
+        {
+            try
+            {
+                getRemoteCacheService().removeAll( cacheName, getListenerId() );
+            }
+            catch ( Exception ex )
+            {
+                handleException( ex, "Failed to remove all from " + cacheName, ICacheEventLogger.REMOVEALL_EVENT );
+            }
+        }
+    }
+
+    /**
+     * Serializes the object and then calls update on the remote server with the byte array. The
+     * byte array is wrapped in a ICacheElementSerialized. This allows the remote server to operate
+     * without any knowledge of caches classes.
+     * <p>
+     * @param ce
+     * @throws IOException
+     */
+    @Override
+    protected void processUpdate( ICacheElement<K, V> ce )
+        throws IOException
+    {
+        if ( !getRemoteCacheAttributes().getGetOnly() )
+        {
+            ICacheElementSerialized<K, V> serialized = null;
+            try
+            {
+                if ( log.isDebugEnabled() )
+                {
+                    log.debug( "sending item to remote server" );
+                }
+
+                // convert so we don't have to know about the object on the
+                // other end.
+                serialized = SerializationConversionUtil.getSerializedCacheElement( ce, this.elementSerializer );
+
+                remoteCacheService.update( serialized, getListenerId() );
+            }
+            catch ( NullPointerException npe )
+            {
+                log.error( "npe for ce = " + ce + "ce.attr = " + ce.getElementAttributes(), npe );
+            }
+            catch ( Exception ex )
+            {
+                // event queue will wait and retry
+                handleException( ex, "Failed to put [" + ce.getKey() + "] to " + ce.getCacheName(),
+                                 ICacheEventLogger.UPDATE_EVENT );
+            }
+        }
+        else
+        {
+            if ( log.isDebugEnabled() )
+            {
+                log.debug( "get only mode, not sending to remote server" );
+            }
+        }
+    }
+
+    /**
+     * Return the keys in this cache.
+     * <p>
+     * @see org.apache.commons.jcs.auxiliary.AuxiliaryCache#getKeySet()
+     */
+    public Set<K> getKeySet()
+        throws java.rmi.RemoteException, IOException
+    {
+        return getRemoteCacheService().getKeySet(cacheName);
+    }
+
+    /**
+     * Allows other member of this package to access the listener. This is mainly needed for
+     * deregistering a listener.
+     * <p>
+     * @return IRemoteCacheListener, the listener for this remote server
+     */
+    public IRemoteCacheListener<K, V> getListener()
+    {
+        return getRemoteCacheListener();
+    }
+
+    /**
+     * let the remote cache set a listener_id. Since there is only one listener for all the regions
+     * and every region gets registered? the id shouldn't be set if it isn't zero. If it is we
+     * assume that it is a reconnect.
+     * <p>
+     * @param id The new listenerId value
+     */
+    public void setListenerId( long id )
+    {
+        if ( getRemoteCacheListener() != null )
+        {
+            try
+            {
+                getRemoteCacheListener().setListenerId( id );
+
+                if ( log.isDebugEnabled() )
+                {
+                    log.debug( "set listenerId = " + id );
+                }
+            }
+            catch ( Exception e )
+            {
+                log.error( "Problem setting listenerId", e );
+            }
+        }
+    }
+
+    /**
+     * Gets the listenerId attribute of the RemoteCacheListener object
+     * <p>
+     * @return The listenerId value
+     */
+    public long getListenerId()
+    {
+        if ( getRemoteCacheListener() != null )
+        {
+            try
+            {
+                if ( log.isDebugEnabled() )
+                {
+                    log.debug( "get listenerId = " + getRemoteCacheListener().getListenerId() );
+                }
+                return getRemoteCacheListener().getListenerId();
+            }
+            catch ( Exception e )
+            {
+                log.error( "Problem getting listenerId", e );
+            }
+        }
+        return -1;
+    }
+
+    /**
+     * Returns the current cache size.
+     * @return The size value
+     */
+    public int getSize()
+    {
+        return 0;
+    }
+
+    /**
+     * Custom exception handling some children.  This should be used to initiate failover.
+     * <p>
+     * @param ex
+     * @param msg
+     * @param eventName
+     * @throws IOException
+     */
+    protected abstract void handleException( Exception ex, String msg, String eventName )
+        throws IOException;
+
+    /**
+     * Gets the stats attribute of the RemoteCache object.
+     * <p>
+     * @return The stats value
+     */
+    public String getStats()
+    {
+        return getStatistics().toString();
+    }
+
+    /**
+     * @return IStats object
+     */
+    public IStats getStatistics()
+    {
+        IStats stats = new Stats();
+        stats.setTypeName( "AbstractRemoteAuxiliaryCache" );
+
+        ArrayList<IStatElement> elems = new ArrayList<IStatElement>();
+
+        IStatElement se = null;
+
+        se = new StatElement();
+        se.setName( "Remote Type" );
+        se.setData( this.getRemoteCacheAttributes().getRemoteTypeName() + "" );
+        elems.add( se );
+
+        if ( this.getRemoteCacheAttributes().getRemoteType() == RemoteType.CLUSTER )
+        {
+            // something cluster specific
+        }
+
+        // no data gathered here
+
+        se = new StatElement();
+        se.setName( "UsePoolForGet" );
+        se.setData( "" + usePoolForGet );
+        elems.add( se );
+
+        if ( pool != null )
+        {
+            se = new StatElement();
+            se.setName( "Pool Size" );
+            se.setData( "" + pool.getPoolSize() );
+            elems.add( se );
+
+            se = new StatElement();
+            se.setName( "Maximum Pool Size" );
+            se.setData( "" + pool.getMaximumPoolSize() );
+            elems.add( se );
+        }
+
+        if ( getRemoteCacheService() instanceof ZombieCacheServiceNonLocal )
+        {
+            se = new StatElement();
+            se.setName( "Zombie Queue Size" );
+            se.setData( "" + ( (ZombieCacheServiceNonLocal<K, V>) getRemoteCacheService() ).getQueueSize() );
+            elems.add( se );
+        }
+
+        // get an array and put them in the Stats object
+        IStatElement[] ses = elems.toArray( new StatElement[0] );
+        stats.setStatElements( ses );
+
+        return stats;
+    }
+
+    /**
+     * Returns the cache status. An error status indicates the remote connection is not available.
+     * <p>
+     * @return The status value
+     */
+    public CacheStatus getStatus()
+    {
+        return getRemoteCacheService() instanceof IZombie ? CacheStatus.ERROR : CacheStatus.ALIVE;
+    }
+
+    /**
+     * Replaces the current remote cache service handle with the given handle. If the current remote
+     * is a Zombie, then it propagates any events that are queued to the restored service.
+     * <p>
+     * @param restoredRemote ICacheServiceNonLocal -- the remote server or proxy to the remote server
+     */
+    public void fixCache( ICacheServiceNonLocal<?, ?> restoredRemote )
+    {
+        @SuppressWarnings("unchecked") // Don't know how to do this properly
+        ICacheServiceNonLocal<K, V> remote = (ICacheServiceNonLocal<K, V>)restoredRemote;
+        if ( getRemoteCacheService() != null && getRemoteCacheService() instanceof ZombieCacheServiceNonLocal )
+        {
+            ZombieCacheServiceNonLocal<K, V> zombie = (ZombieCacheServiceNonLocal<K, V>) getRemoteCacheService();
+            setRemoteCacheService( remote );
+            try
+            {
+                zombie.propagateEvents( remote );
+            }
+            catch ( Exception e )
+            {
+                try
+                {
+                    handleException( e, "Problem propagating events from Zombie Queue to new Remote Service.",
+                                     "fixCache" );
+                }
+                catch ( IOException e1 )
+                {
+                    // swallow, since this is just expected kick back.  Handle always throws
+                }
+            }
+        }
+        else
+        {
+            setRemoteCacheService( remote );
+        }
+    }
+
+
+    /**
+     * Gets the cacheType attribute of the RemoteCache object
+     * @return The cacheType value
+     */
+    public CacheType getCacheType()
+    {
+        return CacheType.REMOTE_CACHE;
+    }
+
+    /**
+     * Gets the cacheName attribute of the RemoteCache object.
+     * <p>
+     * @return The cacheName value
+     */
+    public String getCacheName()
+    {
+        return cacheName;
+    }
+
+    /**
+     * @param remote the remote to set
+     */
+    protected void setRemoteCacheService( ICacheServiceNonLocal<K, V> remote )
+    {
+        this.remoteCacheService = remote;
+    }
+
+    /**
+     * @return the remote
+     */
+    protected ICacheServiceNonLocal<K, V> getRemoteCacheService()
+    {
+        return remoteCacheService;
+    }
+
+    /**
+     * @return Returns the AuxiliaryCacheAttributes.
+     */
+    public AuxiliaryCacheAttributes getAuxiliaryCacheAttributes()
+    {
+        return getRemoteCacheAttributes();
+    }
+
+    /**
+     * @param remoteCacheAttributes the remoteCacheAttributes to set
+     */
+    protected void setRemoteCacheAttributes( IRemoteCacheAttributes remoteCacheAttributes )
+    {
+        this.remoteCacheAttributes = remoteCacheAttributes;
+    }
+
+    /**
+     * @return the remoteCacheAttributes
+     */
+    protected IRemoteCacheAttributes getRemoteCacheAttributes()
+    {
+        return remoteCacheAttributes;
+    }
+
+    /**
+     * @param remoteCacheListener the remoteCacheListener to set
+     */
+    protected void setRemoteCacheListener( IRemoteCacheListener<K, V> remoteCacheListener )
+    {
+        this.remoteCacheListener = remoteCacheListener;
+    }
+
+    /**
+     * @return the remoteCacheListener
+     */
+    protected IRemoteCacheListener<K, V> getRemoteCacheListener()
+    {
+        return remoteCacheListener;
+    }
+}

Modified: commons/proper/jcs/trunk/src/java/org/apache/commons/jcs/auxiliary/remote/AbstractRemoteCacheListener.java
URL: http://svn.apache.org/viewvc/commons/proper/jcs/trunk/src/java/org/apache/commons/jcs/auxiliary/remote/AbstractRemoteCacheListener.java?rev=1570993&r1=1570992&r2=1570993&view=diff
==============================================================================
--- commons/proper/jcs/trunk/src/java/org/apache/commons/jcs/auxiliary/remote/AbstractRemoteCacheListener.java (original)
+++ commons/proper/jcs/trunk/src/java/org/apache/commons/jcs/auxiliary/remote/AbstractRemoteCacheListener.java Sun Feb 23 10:37:48 2014
@@ -1,345 +1,345 @@
-package org.apache.commons.jcs.auxiliary.remote;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.net.UnknownHostException;
-
-import org.apache.commons.jcs.access.exception.CacheException;
-import org.apache.commons.jcs.auxiliary.remote.behavior.IRemoteCacheAttributes;
-import org.apache.commons.jcs.auxiliary.remote.behavior.IRemoteCacheListener;
-import org.apache.commons.jcs.auxiliary.remote.server.behavior.RemoteType;
-import org.apache.commons.jcs.engine.behavior.ICacheElement;
-import org.apache.commons.jcs.engine.behavior.ICacheElementSerialized;
-import org.apache.commons.jcs.engine.behavior.ICompositeCacheManager;
-import org.apache.commons.jcs.engine.behavior.IElementSerializer;
-import org.apache.commons.jcs.engine.control.CompositeCache;
-import org.apache.commons.jcs.engine.control.CompositeCacheManager;
-import org.apache.commons.jcs.utils.net.HostNameUtil;
-import org.apache.commons.jcs.utils.serialization.SerializationConversionUtil;
-import org.apache.commons.jcs.utils.serialization.StandardSerializer;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-/** Shared listener base. */
-public abstract class AbstractRemoteCacheListener<K extends Serializable, V extends Serializable>
-    implements IRemoteCacheListener<K, V>, Serializable
-{
-    /** Don't change */
-    private static final long serialVersionUID = 32442324243243L;
-
-    /** The logger */
-    private final static Log log = LogFactory.getLog( AbstractRemoteCacheListener.class );
-
-    /** The cached name of the local host. The remote server gets this for logging purposes. */
-    private static String localHostName = null;
-
-    /** Has this client been shutdown. */
-    boolean disposed = false;
-
-    /**
-     * The cache manager used to put items in different regions. This is set lazily and should not
-     * be sent to the remote server.
-     */
-    protected transient ICompositeCacheManager cacheMgr;
-
-    /** The remote cache configuration object. */
-    protected IRemoteCacheAttributes irca;
-
-    /** Number of put requests received. For debugging only. */
-    protected int puts = 0;
-
-    /** Number of remove requests received. For debugging only. */
-    protected int removes = 0;
-
-    /** This is set by the remote cache server. */
-    protected long listenerId = 0;
-
-    /** Custom serializer. Standard by default. */
-    private transient IElementSerializer elementSerializer = new StandardSerializer();
-
-    /**
-     * Only need one since it does work for all regions, just reference by multiple region names.
-     * <p>
-     * The constructor exports this object, making it available to receive incoming calls. The
-     * callback port is anonymous unless a local port value was specified in the configuration.
-     * <p>
-     * @param irca
-     * @param cacheMgr
-     */
-    public AbstractRemoteCacheListener( IRemoteCacheAttributes irca, ICompositeCacheManager cacheMgr )
-    {
-        this.irca = irca;
-        this.cacheMgr = cacheMgr;
-    }
-
-    /**
-     * Let the remote cache set a listener_id. Since there is only one listener for all the regions
-     * and every region gets registered? the id shouldn't be set if it isn't zero. If it is we
-     * assume that it is a reconnect.
-     * <p>
-     * @param id The new listenerId value
-     * @throws IOException
-     */
-    public void setListenerId( long id )
-        throws IOException
-    {
-        listenerId = id;
-        if ( log.isInfoEnabled() )
-        {
-            log.info( "set listenerId = [" + id + "]" );
-        }
-    }
-
-    /**
-     * Gets the listenerId attribute of the RemoteCacheListener object. This is stored in the
-     * object. The RemoteCache object contains a reference to the listener and get the id this way.
-     * <p>
-     * @return The listenerId value
-     * @throws IOException
-     */
-    public long getListenerId()
-        throws IOException
-    {
-        if ( log.isDebugEnabled() )
-        {
-            log.debug( "get listenerId = [" + listenerId + "]" );
-        }
-        return listenerId;
-
-    }
-
-    /**
-     * Gets the remoteType attribute of the RemoteCacheListener object <p.
-     * @return The remoteType value
-     * @throws IOException
-     */
-    public RemoteType getRemoteType()
-        throws IOException
-    {
-        if ( log.isDebugEnabled() )
-        {
-            log.debug( "getRemoteType = [" + irca.getRemoteType() + "]" );
-        }
-        return irca.getRemoteType();
-    }
-
-    /**
-     * If this is configured to remove on put, then remove the element since it has been updated
-     * elsewhere. cd should be incomplete for faster transmission. We don't want to pass data only
-     * invalidation. The next time it is used the local cache will get the new version from the
-     * remote store.
-     * <p>
-     * If remove on put is not configured, then update the item.
-     * @param cb
-     * @throws IOException
-     */
-    public void handlePut( ICacheElement<K, V> cb )
-        throws IOException
-    {
-        if ( irca.getRemoveUponRemotePut() )
-        {
-            if ( log.isDebugEnabled() )
-            {
-                log.debug( "PUTTING ELEMENT FROM REMOTE, (  invalidating ) " );
-            }
-            handleRemove( cb.getCacheName(), cb.getKey() );
-        }
-        else
-        {
-            puts++;
-            if ( log.isDebugEnabled() )
-            {
-                log.debug( "PUTTING ELEMENT FROM REMOTE, ( updating ) " );
-                log.debug( "cb = " + cb );
-
-                if ( puts % 100 == 0 )
-                {
-                    log.debug( "puts = " + puts );
-                }
-            }
-
-            CompositeCache<K, V> cache = getCacheManager().getCache( cb.getCacheName() );
-
-            // Eventually the instance of will not be necessary.
-            if ( cb instanceof ICacheElementSerialized )
-            {
-                if ( log.isDebugEnabled() )
-                {
-                    log.debug( "Object needs to be deserialized." );
-                }
-                try
-                {
-                    cb = SerializationConversionUtil.getDeSerializedCacheElement(
-                            (ICacheElementSerialized<K, V>) cb, this.elementSerializer );
-                    if ( log.isDebugEnabled() )
-                    {
-                        log.debug( "Deserialized result = " + cb );
-                    }
-                }
-                catch ( IOException e )
-                {
-                    throw e;
-                }
-                catch ( ClassNotFoundException e )
-                {
-                    log.error( "Received a serialized version of a class that we don't know about.", e );
-                }
-            }
-
-            cache.localUpdate( cb );
-        }
-    }
-
-    /**
-     * Calls localRemove on the CompositeCache.
-     * <p>
-     * @param cacheName
-     * @param key
-     * @throws IOException
-     */
-    public void handleRemove( String cacheName, K key )
-        throws IOException
-    {
-        removes++;
-        if ( log.isDebugEnabled() )
-        {
-            if ( removes % 100 == 0 )
-            {
-                log.debug( "removes = " + removes );
-            }
-
-            log.debug( "handleRemove> cacheName=" + cacheName + ", key=" + key );
-        }
-
-        CompositeCache<K, V> cache = getCacheManager().getCache( cacheName );
-
-        cache.localRemove( key );
-    }
-
-    /**
-     * Calls localRemoveAll on the CompositeCache.
-     * <p>
-     * @param cacheName
-     * @throws IOException
-     */
-    public void handleRemoveAll( String cacheName )
-        throws IOException
-    {
-        if ( log.isDebugEnabled() )
-        {
-            log.debug( "handleRemoveAll> cacheName=" + cacheName );
-        }
-
-        CompositeCache<K, V> cache = getCacheManager().getCache( cacheName );
-        cache.localRemoveAll();
-    }
-
-    /**
-     * @param cacheName
-     * @throws IOException
-     */
-    public void handleDispose( String cacheName )
-        throws IOException
-    {
-        if ( log.isDebugEnabled() )
-        {
-            log.debug( "handleDispose> cacheName=" + cacheName );
-        }
-        // TODO consider what to do here, we really don't want to
-        // dispose, we just want to disconnect.
-        // just allow the cache to go into error recovery mode.
-        // getCacheManager().freeCache( cacheName, true );
-    }
-
-    /**
-     * Gets the cacheManager attribute of the RemoteCacheListener object. This is one of the few
-     * places that force the cache to be a singleton.
-     */
-    protected ICompositeCacheManager getCacheManager()
-    {
-        if ( cacheMgr == null )
-        {
-            try
-            {
-                cacheMgr = CompositeCacheManager.getInstance();
-
-                if ( log.isDebugEnabled() )
-                {
-                    log.debug( "had to get cacheMgr" );
-                    log.debug( "cacheMgr = " + cacheMgr );
-                }
-            }
-            catch (CacheException e)
-            {
-                log.error( "Could not get cacheMgr", e );
-            }
-        }
-        else
-        {
-            if ( log.isDebugEnabled() )
-            {
-                log.debug( "already got cacheMgr = " + cacheMgr );
-            }
-        }
-
-        return cacheMgr;
-    }
-
-    /**
-     * This is for debugging. It allows the remote server to log the address of clients.
-     * <p>
-     * @return String
-     * @throws IOException
-     */
-    public synchronized String getLocalHostAddress()
-        throws IOException
-    {
-        if ( localHostName == null )
-        {
-            try
-            {
-                localHostName = HostNameUtil.getLocalHostAddress();
-            }
-            catch ( UnknownHostException uhe )
-            {
-                localHostName = "unknown";
-            }
-        }
-        return localHostName;
-    }
-
-    /**
-     * For easier debugging.
-     * <p>
-     * @return Basic info on this listener.
-     */
-    @Override
-    public String toString()
-    {
-        StringBuffer buf = new StringBuffer();
-        buf.append( "\n AbstractRemoteCacheListener: " );
-        buf.append( "\n RemoteHost = " + irca.getRemoteHost() );
-        buf.append( "\n RemotePort = " + irca.getRemotePort() );
-        buf.append( "\n ListenerId = " + listenerId );
-        return buf.toString();
-    }
-}
+package org.apache.commons.jcs.auxiliary.remote;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.net.UnknownHostException;
+
+import org.apache.commons.jcs.access.exception.CacheException;
+import org.apache.commons.jcs.auxiliary.remote.behavior.IRemoteCacheAttributes;
+import org.apache.commons.jcs.auxiliary.remote.behavior.IRemoteCacheListener;
+import org.apache.commons.jcs.auxiliary.remote.server.behavior.RemoteType;
+import org.apache.commons.jcs.engine.behavior.ICacheElement;
+import org.apache.commons.jcs.engine.behavior.ICacheElementSerialized;
+import org.apache.commons.jcs.engine.behavior.ICompositeCacheManager;
+import org.apache.commons.jcs.engine.behavior.IElementSerializer;
+import org.apache.commons.jcs.engine.control.CompositeCache;
+import org.apache.commons.jcs.engine.control.CompositeCacheManager;
+import org.apache.commons.jcs.utils.net.HostNameUtil;
+import org.apache.commons.jcs.utils.serialization.SerializationConversionUtil;
+import org.apache.commons.jcs.utils.serialization.StandardSerializer;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/** Shared listener base. */
+public abstract class AbstractRemoteCacheListener<K extends Serializable, V extends Serializable>
+    implements IRemoteCacheListener<K, V>, Serializable
+{
+    /** Don't change */
+    private static final long serialVersionUID = 32442324243243L;
+
+    /** The logger */
+    private final static Log log = LogFactory.getLog( AbstractRemoteCacheListener.class );
+
+    /** The cached name of the local host. The remote server gets this for logging purposes. */
+    private static String localHostName = null;
+
+    /** Has this client been shutdown. */
+    boolean disposed = false;
+
+    /**
+     * The cache manager used to put items in different regions. This is set lazily and should not
+     * be sent to the remote server.
+     */
+    protected transient ICompositeCacheManager cacheMgr;
+
+    /** The remote cache configuration object. */
+    protected IRemoteCacheAttributes irca;
+
+    /** Number of put requests received. For debugging only. */
+    protected int puts = 0;
+
+    /** Number of remove requests received. For debugging only. */
+    protected int removes = 0;
+
+    /** This is set by the remote cache server. */
+    protected long listenerId = 0;
+
+    /** Custom serializer. Standard by default. */
+    private transient IElementSerializer elementSerializer = new StandardSerializer();
+
+    /**
+     * Only need one since it does work for all regions, just reference by multiple region names.
+     * <p>
+     * The constructor exports this object, making it available to receive incoming calls. The
+     * callback port is anonymous unless a local port value was specified in the configuration.
+     * <p>
+     * @param irca
+     * @param cacheMgr
+     */
+    public AbstractRemoteCacheListener( IRemoteCacheAttributes irca, ICompositeCacheManager cacheMgr )
+    {
+        this.irca = irca;
+        this.cacheMgr = cacheMgr;
+    }
+
+    /**
+     * Let the remote cache set a listener_id. Since there is only one listener for all the regions
+     * and every region gets registered? the id shouldn't be set if it isn't zero. If it is we
+     * assume that it is a reconnect.
+     * <p>
+     * @param id The new listenerId value
+     * @throws IOException
+     */
+    public void setListenerId( long id )
+        throws IOException
+    {
+        listenerId = id;
+        if ( log.isInfoEnabled() )
+        {
+            log.info( "set listenerId = [" + id + "]" );
+        }
+    }
+
+    /**
+     * Gets the listenerId attribute of the RemoteCacheListener object. This is stored in the
+     * object. The RemoteCache object contains a reference to the listener and get the id this way.
+     * <p>
+     * @return The listenerId value
+     * @throws IOException
+     */
+    public long getListenerId()
+        throws IOException
+    {
+        if ( log.isDebugEnabled() )
+        {
+            log.debug( "get listenerId = [" + listenerId + "]" );
+        }
+        return listenerId;
+
+    }
+
+    /**
+     * Gets the remoteType attribute of the RemoteCacheListener object <p.
+     * @return The remoteType value
+     * @throws IOException
+     */
+    public RemoteType getRemoteType()
+        throws IOException
+    {
+        if ( log.isDebugEnabled() )
+        {
+            log.debug( "getRemoteType = [" + irca.getRemoteType() + "]" );
+        }
+        return irca.getRemoteType();
+    }
+
+    /**
+     * If this is configured to remove on put, then remove the element since it has been updated
+     * elsewhere. cd should be incomplete for faster transmission. We don't want to pass data only
+     * invalidation. The next time it is used the local cache will get the new version from the
+     * remote store.
+     * <p>
+     * If remove on put is not configured, then update the item.
+     * @param cb
+     * @throws IOException
+     */
+    public void handlePut( ICacheElement<K, V> cb )
+        throws IOException
+    {
+        if ( irca.getRemoveUponRemotePut() )
+        {
+            if ( log.isDebugEnabled() )
+            {
+                log.debug( "PUTTING ELEMENT FROM REMOTE, (  invalidating ) " );
+            }
+            handleRemove( cb.getCacheName(), cb.getKey() );
+        }
+        else
+        {
+            puts++;
+            if ( log.isDebugEnabled() )
+            {
+                log.debug( "PUTTING ELEMENT FROM REMOTE, ( updating ) " );
+                log.debug( "cb = " + cb );
+
+                if ( puts % 100 == 0 )
+                {
+                    log.debug( "puts = " + puts );
+                }
+            }
+
+            CompositeCache<K, V> cache = getCacheManager().getCache( cb.getCacheName() );
+
+            // Eventually the instance of will not be necessary.
+            if ( cb instanceof ICacheElementSerialized )
+            {
+                if ( log.isDebugEnabled() )
+                {
+                    log.debug( "Object needs to be deserialized." );
+                }
+                try
+                {
+                    cb = SerializationConversionUtil.getDeSerializedCacheElement(
+                            (ICacheElementSerialized<K, V>) cb, this.elementSerializer );
+                    if ( log.isDebugEnabled() )
+                    {
+                        log.debug( "Deserialized result = " + cb );
+                    }
+                }
+                catch ( IOException e )
+                {
+                    throw e;
+                }
+                catch ( ClassNotFoundException e )
+                {
+                    log.error( "Received a serialized version of a class that we don't know about.", e );
+                }
+            }
+
+            cache.localUpdate( cb );
+        }
+    }
+
+    /**
+     * Calls localRemove on the CompositeCache.
+     * <p>
+     * @param cacheName
+     * @param key
+     * @throws IOException
+     */
+    public void handleRemove( String cacheName, K key )
+        throws IOException
+    {
+        removes++;
+        if ( log.isDebugEnabled() )
+        {
+            if ( removes % 100 == 0 )
+            {
+                log.debug( "removes = " + removes );
+            }
+
+            log.debug( "handleRemove> cacheName=" + cacheName + ", key=" + key );
+        }
+
+        CompositeCache<K, V> cache = getCacheManager().getCache( cacheName );
+
+        cache.localRemove( key );
+    }
+
+    /**
+     * Calls localRemoveAll on the CompositeCache.
+     * <p>
+     * @param cacheName
+     * @throws IOException
+     */
+    public void handleRemoveAll( String cacheName )
+        throws IOException
+    {
+        if ( log.isDebugEnabled() )
+        {
+            log.debug( "handleRemoveAll> cacheName=" + cacheName );
+        }
+
+        CompositeCache<K, V> cache = getCacheManager().getCache( cacheName );
+        cache.localRemoveAll();
+    }
+
+    /**
+     * @param cacheName
+     * @throws IOException
+     */
+    public void handleDispose( String cacheName )
+        throws IOException
+    {
+        if ( log.isDebugEnabled() )
+        {
+            log.debug( "handleDispose> cacheName=" + cacheName );
+        }
+        // TODO consider what to do here, we really don't want to
+        // dispose, we just want to disconnect.
+        // just allow the cache to go into error recovery mode.
+        // getCacheManager().freeCache( cacheName, true );
+    }
+
+    /**
+     * Gets the cacheManager attribute of the RemoteCacheListener object. This is one of the few
+     * places that force the cache to be a singleton.
+     */
+    protected ICompositeCacheManager getCacheManager()
+    {
+        if ( cacheMgr == null )
+        {
+            try
+            {
+                cacheMgr = CompositeCacheManager.getInstance();
+
+                if ( log.isDebugEnabled() )
+                {
+                    log.debug( "had to get cacheMgr" );
+                    log.debug( "cacheMgr = " + cacheMgr );
+                }
+            }
+            catch (CacheException e)
+            {
+                log.error( "Could not get cacheMgr", e );
+            }
+        }
+        else
+        {
+            if ( log.isDebugEnabled() )
+            {
+                log.debug( "already got cacheMgr = " + cacheMgr );
+            }
+        }
+
+        return cacheMgr;
+    }
+
+    /**
+     * This is for debugging. It allows the remote server to log the address of clients.
+     * <p>
+     * @return String
+     * @throws IOException
+     */
+    public synchronized String getLocalHostAddress()
+        throws IOException
+    {
+        if ( localHostName == null )
+        {
+            try
+            {
+                localHostName = HostNameUtil.getLocalHostAddress();
+            }
+            catch ( UnknownHostException uhe )
+            {
+                localHostName = "unknown";
+            }
+        }
+        return localHostName;
+    }
+
+    /**
+     * For easier debugging.
+     * <p>
+     * @return Basic info on this listener.
+     */
+    @Override
+    public String toString()
+    {
+        StringBuffer buf = new StringBuffer();
+        buf.append( "\n AbstractRemoteCacheListener: " );
+        buf.append( "\n RemoteHost = " + irca.getRemoteHost() );
+        buf.append( "\n RemotePort = " + irca.getRemotePort() );
+        buf.append( "\n ListenerId = " + listenerId );
+        return buf.toString();
+    }
+}



Mime
View raw message