geode-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kl...@apache.org
Subject [13/21] incubator-geode git commit: GEODE-1566: rename GeodeRedisServer and repackage redis code into org.apache.geode
Date Fri, 08 Jul 2016 18:21:21 GMT
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/dfd481e0/geode-core/src/main/java/com/gemstone/gemfire/redis/GemFireRedisServer.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/redis/GemFireRedisServer.java b/geode-core/src/main/java/com/gemstone/gemfire/redis/GemFireRedisServer.java
deleted file mode 100644
index 81f87d5..0000000
--- a/geode-core/src/main/java/com/gemstone/gemfire/redis/GemFireRedisServer.java
+++ /dev/null
@@ -1,695 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.redis;
-
-import static com.gemstone.gemfire.distributed.ConfigurationProperties.*;
-
-import com.gemstone.gemfire.LogWriter;
-import com.gemstone.gemfire.cache.*;
-import com.gemstone.gemfire.cache.util.CacheListenerAdapter;
-import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
-import com.gemstone.gemfire.internal.SocketCreator;
-import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
-import com.gemstone.gemfire.internal.hll.HyperLogLogPlus;
-import com.gemstone.gemfire.internal.redis.*;
-import io.netty.bootstrap.ServerBootstrap;
-import io.netty.buffer.PooledByteBufAllocator;
-import io.netty.channel.*;
-import io.netty.channel.nio.NioEventLoopGroup;
-import io.netty.channel.oio.OioEventLoopGroup;
-import io.netty.channel.socket.SocketChannel;
-import io.netty.channel.socket.nio.NioServerSocketChannel;
-import io.netty.channel.socket.oio.OioServerSocketChannel;
-import io.netty.util.concurrent.Future;
-
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.UnknownHostException;
-import java.util.Collection;
-import java.util.Map.Entry;
-import java.util.concurrent.*;
-import java.util.concurrent.atomic.AtomicInteger;
-
-/**
- * The GemFireRedisServer is a server that understands the Redis protocol. As
- * commands are sent to the server, each command is picked up by a thread,
- * interpreted and then executed and a response is sent back to the client. The
- * default connection port is 6379 but that can be altered when run through GFSH
- * or started through the provided static main class.
- * <p>
- * Each Redis data type instance is stored in a separate {@link Region} except
- * for the Strings and HyperLogLogs which are collectively stored in one Region
- * respectively. That Region along with a meta data region used internally are 
- * protected so the client may not store keys with the name {@link GemFireRedisServer#REDIS_META_DATA_REGION}
- * or {@link GemFireRedisServer#STRING_REGION}. The default Region type is 
- * {@link RegionShortcut#PARTITION} although this can be changed by specifying the
- * SystemProperty {@value #DEFAULT_REGION_SYS_PROP_NAME} to a type defined by {@link RegionShortcut}.
- * If the {@link GemFireRedisServer#NUM_THREADS_SYS_PROP_NAME} system property is set to 0,
- * one thread per client will be created. Otherwise a worker thread pool of specified size is
- * used or a default size of 4 * {@link Runtime#availableProcessors()} if the property is not set.
- * <p>
- * Setting the AUTH password requires setting the property "redis-password" just as "redis-port"
- * would be in xml or through GFSH.
- * <p>
- * The supported commands are as follows:
- * <p>
- * Supported String commands - APPEND, BITCOUNT, BITOP, BITPOS, DECR, DECRBY, 
- * GET, GETBIT, GETRANGE, GETSET, INCR, INCRBY, INCRBYFLOAT, MGET, MSET, MSETNX,
- * PSETEX, SET, SETBIT, SETEX, SETNX, STRLEN
- * <p>
- * Supported List commands - LINDEX, LLEN, LPOP, LPUSH, LPUSHX, LRANGE,
- * LREM, LSET, LTRIM, RPOP, RPUSH, RPUSHX
- * <p>
- * Supported Hash commands - HDEL, HEXISTS, HGET, HGETALL, HINCRBY, HINCRBYFLOAT,
- * HKEYS, HMGET, HMSET, HSETNX, HLEN, HSCAN, HSET, HVALS
- * <p>
- * Supported Set commands - SADD, SCARD, SDIFF, SDIFFSTORE, SINTER,
- * SINTERSTORE, SISMEMBER, SMEMBERS, SMOVE, SREM, SPOP, SRANDMEMBER, 
- * SCAN, SUNION, SUNIONSTORE
- * <p>
- * Supported SortedSet commands - ZADD, ZCARD, ZCOUNT, ZINCRBY, ZLEXCOUNT,
- * ZRANGE, ZRANGEBYLEX, ZRANGEBYSCORE, ZRANK, ZREM, ZREMRANGEBYLEX,
- * ZREMRANGEBYRANK, ZREMRANGEBYSCORE, ZREVRANGE, ZREVRANGEBYSCORE, ZREVRANK,
- * ZSCAN, ZSCORE
- * <p>
- * Supported HyperLogLog commands - PFADD, PFCOUNT, PFMERGE
- * <p>
- * Supported Keys commands - DEL, DBSIZE, EXISTS, EXPIRE, EXPIREAT, FLUSHDB, FLUSHALL, 
- * KEYS, PERSIST, PEXPIRE, PEXPIREAT, PTTL, SCAN, TTL
- * <p>
- * Supported Transaction commands - DISCARD, EXEC, MULTI
- * <P>
- * Supported Server commands - AUTH, ECHO, PING, TIME, QUIT
- * <p>
- * <p>
- * The command executors are not explicitly documented but the functionality
- * can be found at <a href="http://redis.io/commands">Redis Commands</a>
- * <p>
- * Exceptions to the Redis Commands Documents:<p>
- * <ul>
- * <li>Any command that removes keys and returns a count of removed
- * entries will not return a total remove count but rather a count of how
- * many entries have been removed that existed on the local vm, though 
- * all entries will be removed</li>
- * <li>Any command that returns a count of newly set members has an
- * unspecified return value. The command will work just as the Redis protocol
- * states but the count will not necessary reflect the number set compared
- * to overridden.</li>
- * <li>Transactions work just as they would on a Redis instance, they are local
- * transaction. Transactions cannot be executed on data that is not local to the 
- * executing server, that is on a partitioned region in a different server 
- * instance or on a persistent region that does not have transactions enabled.
- * Also, you cannot watch or unwatch keys as all keys within a GemFire 
- * transaction are watched by default.</li>
- * </ul>
- *
- */
-
-public class GemFireRedisServer {
-
-  /**
-   * Thread used to start main method
-   */
-  private static Thread mainThread = null;
-
-  /**
-   * The default Redis port as specified by their protocol, {@value #DEFAULT_REDIS_SERVER_PORT}
-   */
-  public static final int DEFAULT_REDIS_SERVER_PORT = 6379;
-
-  /**
-   * The number of threads that will work on handling requests
-   */
-  private final int numWorkerThreads;
-
-  /**
-   * The number of threads that will work socket selectors
-   */
-  private final int numSelectorThreads;
-
-  /**
-   * The actual port being used by the server
-   */
-  private final int serverPort;
-
-  /**
-   * The address to bind to
-   */
-  private final String bindAddress;
-
-  /**
-   * Connection timeout in milliseconds
-   */
-  private static final int connectTimeoutMillis = 1000;
-
-  /**
-   * Temporary constant whether to use old single thread per connection
-   * model for worker group
-   */
-  private boolean singleThreadPerConnection;
-
-  /**
-   * Logging level
-   */
-  private final String logLevel;
-
-  /**
-   * The cache instance pointer on this vm
-   */
-  private Cache cache;
-
-  /**
-   * Channel to be closed when shutting down
-   */
-  private Channel serverChannel;
-
-  /**
-   * Gem logwriter
-   */
-  private LogWriter logger;
-
-  private RegionProvider regionCache;
-
-  private final MetaCacheListener metaListener;
-
-  private EventLoopGroup bossGroup;
-  private EventLoopGroup workerGroup;
-  private final static int numExpirationThreads = 1;
-  private final ScheduledExecutorService expirationExecutor;
-
-  /**
-   * Map of futures to be executed for key expirations
-   */
-  private final ConcurrentMap<ByteArrayWrapper, ScheduledFuture<?>> expirationFutures;
-
-
-  /**
-   * The field that defines the name of the {@link Region} which holds all of
-   * the strings. The current value of this field is {@value #STRING_REGION}.
-   */
-  public static final String STRING_REGION = "__StRiNgS";
-
-  /**
-   * The field that defines the name of the {@link Region} which holds all of
-   * the HyperLogLogs. The current value of this field is {@value #HLL_REGION}.
-   */
-  public static final String HLL_REGION = "__HlL";
-
-  /**
-   * The field that defines the name of the {@link Region} which holds all of
-   * the Redis meta data. The current value of this field is {@value #REDIS_META_DATA_REGION}.
-   */
-  public static final String REDIS_META_DATA_REGION = "__ReDiS_MeTa_DaTa";
-
-  /**
-   * The system property name used to set the default {@link Region} creation
-   * type. The property name is {@value #DEFAULT_REGION_SYS_PROP_NAME} and the
-   * acceptable values are types defined by {@link RegionShortcut}, 
-   * i.e. "PARTITION" would be used for {@link RegionShortcut#PARTITION}.
-   */
-  public static final String DEFAULT_REGION_SYS_PROP_NAME = "gemfireredis.regiontype";
-
-  /**
-   * System property name that can be used to set the number of threads to be
-   * used by the GemFireRedisServer
-   */
-  public static final String NUM_THREADS_SYS_PROP_NAME = "gemfireredis.numthreads";
-
-  /**
-   * The actual {@link RegionShortcut} type specified by the system property
-   * {@value #DEFAULT_REGION_SYS_PROP_NAME}.
-   */
-  public final RegionShortcut DEFAULT_REGION_TYPE;
-
-  private boolean shutdown;
-  private boolean started;
-
-  /**
-   * Determine the {@link RegionShortcut} type from a String value.
-   * If the String value doesn't map to a RegionShortcut type then 
-   * {@link RegionShortcut#PARTITION} will be used by default.
-   * 
-   * @return {@link RegionShortcut}
-   */
-  private static RegionShortcut setRegionType() {
-    String regionType = System.getProperty(DEFAULT_REGION_SYS_PROP_NAME, "PARTITION");
-    RegionShortcut type;
-    try {
-      type = RegionShortcut.valueOf(regionType);
-    } catch (Exception e) {
-      type = RegionShortcut.PARTITION;
-    }
-    return type;
-  }
-
-  /**
-   * Helper method to set the number of worker threads
-   * 
-   * @return If the System property {@value #NUM_THREADS_SYS_PROP_NAME} is set then that number
-   * is used, otherwise 4 * # of cores
-   */
-  private int setNumWorkerThreads() {
-    String prop = System.getProperty(NUM_THREADS_SYS_PROP_NAME);
-    int numCores = Runtime.getRuntime().availableProcessors();
-    int def = 4 * numCores;
-    if (prop == null || prop.isEmpty())
-      return def;
-    int threads;
-    try {
-      threads = Integer.parseInt(prop);
-    } catch (NumberFormatException e) {
-      return def;
-    }
-    return threads;
-  }
-
-  /**
-   * Constructor for {@link GemFireRedisServer} that will start the
-   * server on the given port and bind to the first non-loopback address
-   * 
-   * @param port The port the server will bind to, will use {@value #DEFAULT_REDIS_SERVER_PORT} by default
-   */
-  public GemFireRedisServer(int port) {
-    this(null, port, null);
-  }
-
-  /**
-   * Constructor for {@link GemFireRedisServer} that will start the
-   * server and bind to the given address and port
-   * 
-   * @param bindAddress The address to which the server will attempt to bind to
-   * @param port The port the server will bind to, will use {@value #DEFAULT_REDIS_SERVER_PORT} by default if argument is less than or equal to 0
-   */
-  public GemFireRedisServer(String bindAddress, int port) {
-    this(bindAddress, port, null);
-  }
-
-
-  /**
-   * Constructor for {@link GemFireRedisServer} that will start the
-   * server and bind to the given address and port. Keep in mind that the
-   * log level configuration will only be set if a {@link Cache} does not already
-   * exist, if one already exists then setting that property will have no effect.
-   * 
-   * @param bindAddress The address to which the server will attempt to bind to
-   * @param port The port the server will bind to, will use {@value #DEFAULT_REDIS_SERVER_PORT} by default if argument is less than or equal to 0
-   * @param logLevel The logging level to be used by GemFire
-   */
-  public GemFireRedisServer(String bindAddress, int port, String logLevel) {
-    if (port <= 0)
-      this.serverPort = DEFAULT_REDIS_SERVER_PORT;
-    else
-      this.serverPort = port;
-    this.bindAddress = bindAddress;
-    this.logLevel = logLevel;
-    this.numWorkerThreads = setNumWorkerThreads();
-    if (this.numWorkerThreads == 0)
-      this.singleThreadPerConnection = true;
-    this.numSelectorThreads = 1;
-    this.metaListener = new MetaCacheListener();
-    this.expirationFutures = new ConcurrentHashMap<ByteArrayWrapper, ScheduledFuture<?>>();
-    this.expirationExecutor = Executors.newScheduledThreadPool(numExpirationThreads, new ThreadFactory() {
-      private final AtomicInteger counter = new AtomicInteger();
-      @Override
-      public Thread newThread(Runnable r) {
-        Thread t = new Thread(r);
-        t.setName("GemFireRedis-ScheduledExecutor-" + counter.incrementAndGet());
-        t.setDaemon(true);
-        return t;
-      }
-
-    });
-    this.DEFAULT_REGION_TYPE = setRegionType();
-    this.shutdown = false;
-    this.started = false;
-  }
-
-  /**
-   * Helper method to get the host name to bind to
-   * 
-   * @return The InetAddress to bind to
-   * @throws UnknownHostException
-   */
-  private InetAddress getBindAddress() throws UnknownHostException {
-    return this.bindAddress == null || this.bindAddress.isEmpty()
-        ? SocketCreator.getLocalHost()
-            : InetAddress.getByName(this.bindAddress);
-  }
-
-  /**
-   * This is function to call on a {@link GemFireRedisServer} instance
-   * to start it running
-   */
-  public synchronized void start() {
-    if (!started) {
-      try {
-        startGemFire();
-        initializeRedis();
-        startRedisServer();
-      } catch (IOException e) {
-        throw new RuntimeException("Could not start Server", e);
-      } catch (InterruptedException e) {
-        throw new RuntimeException("Could not start Server", e);
-      }
-      started = true;
-    }
-  }
-
-  /**
-   * Initializes the {@link Cache}, and creates Redis necessities
-   * Region and protects declares that {@link Region} to be protected. 
-   * Also, every {@link GemFireRedisServer} will check for entries already in the 
-   * meta data Region.
-   */
-  private void startGemFire() {
-    Cache c = GemFireCacheImpl.getInstance();
-    if (c == null) {
-      synchronized (GemFireRedisServer.class) {
-        c = GemFireCacheImpl.getInstance();
-        if (c == null) {
-          CacheFactory cacheFactory = new CacheFactory();
-          if (logLevel != null)
-            cacheFactory.set(LOG_LEVEL, logLevel);
-          c = cacheFactory.create();
-        }
-      }
-    }
-    this.cache = c;
-    this.logger = c.getLogger();
-  }
-
-  private void initializeRedis() {
-    synchronized (this.cache) {
-      RegionFactory<String, RedisDataType> rfMeta = cache.createRegionFactory(RegionShortcut.REPLICATE);
-      rfMeta.addCacheListener(this.metaListener);
-      RegionFactory<ByteArrayWrapper, ByteArrayWrapper> rfString = cache.createRegionFactory(DEFAULT_REGION_TYPE);
-      RegionFactory<ByteArrayWrapper, HyperLogLogPlus> rfHLL = cache.createRegionFactory(DEFAULT_REGION_TYPE);
-      Region<ByteArrayWrapper, ByteArrayWrapper> stringsRegion;
-      if ((stringsRegion = this.cache.getRegion(STRING_REGION)) == null)
-        stringsRegion = rfString.create(GemFireRedisServer.STRING_REGION);
-      Region<ByteArrayWrapper, HyperLogLogPlus> hLLRegion;
-      if ((hLLRegion = this.cache.getRegion(HLL_REGION)) == null)
-        hLLRegion = rfHLL.create(HLL_REGION);
-      Region<String, RedisDataType> redisMetaData;
-      if ((redisMetaData = this.cache.getRegion(REDIS_META_DATA_REGION)) == null)
-        redisMetaData = rfMeta.create(REDIS_META_DATA_REGION);
-      this.regionCache = new RegionProvider(stringsRegion, hLLRegion, redisMetaData, expirationFutures, expirationExecutor, this.DEFAULT_REGION_TYPE);
-      redisMetaData.put(REDIS_META_DATA_REGION, RedisDataType.REDIS_PROTECTED);
-      redisMetaData.put(HLL_REGION, RedisDataType.REDIS_PROTECTED);
-      redisMetaData.put(STRING_REGION, RedisDataType.REDIS_PROTECTED);
-    }
-    checkForRegions();
-  }
-
-  private void checkForRegions() {
-    Collection<Entry<String, RedisDataType>> entrySet = this.regionCache.metaEntrySet();
-    for (Entry<String, RedisDataType> entry: entrySet) {
-      String regionName = entry.getKey();
-      RedisDataType type = entry.getValue();
-      Region<?, ?> newRegion = cache.getRegion(regionName);
-      if (newRegion == null && type != RedisDataType.REDIS_STRING && type != RedisDataType.REDIS_HLL && type != RedisDataType.REDIS_PROTECTED) {
-        try {
-          this.regionCache.createRemoteRegionReferenceLocally(Coder.stringToByteArrayWrapper(regionName), type);
-        } catch (Exception e) {
-          if (logger.errorEnabled())
-            logger.error(e);
-        }
-      }
-    }
-  }
-
-  /**
-   * Helper method to start the server listening for connections. The
-   * server is bound to the port specified by {@link GemFireRedisServer#serverPort}
-   * 
-   * @throws IOException
-   * @throws InterruptedException
-   */
-  private void startRedisServer() throws IOException, InterruptedException {
-    ThreadFactory selectorThreadFactory = new ThreadFactory() {
-      private final AtomicInteger counter = new AtomicInteger();
-      @Override
-      public Thread newThread(Runnable r) {
-        Thread t = new Thread(r);
-        t.setName("GemFireRedisServer-SelectorThread-" + counter.incrementAndGet());
-        t.setDaemon(true);
-        return t;
-      }
-
-    };
-
-    ThreadFactory workerThreadFactory = new ThreadFactory() {
-      private final AtomicInteger counter = new AtomicInteger();
-      @Override
-      public Thread newThread(Runnable r) {
-        Thread t = new Thread(r);
-        t.setName("GemFireRedisServer-WorkerThread-" + counter.incrementAndGet());
-        return t;
-      }
-
-    };
-
-    bossGroup = null;
-    workerGroup = null;
-    Class<? extends ServerChannel> socketClass = null;
-    if (singleThreadPerConnection) {
-      bossGroup = new OioEventLoopGroup(Integer.MAX_VALUE, selectorThreadFactory);
-      workerGroup = new OioEventLoopGroup(Integer.MAX_VALUE, workerThreadFactory);
-      socketClass = OioServerSocketChannel.class;
-    } else {
-      bossGroup = new NioEventLoopGroup(this.numSelectorThreads, selectorThreadFactory);
-      workerGroup = new NioEventLoopGroup(this.numWorkerThreads, workerThreadFactory);
-      socketClass = NioServerSocketChannel.class;
-    }
-    InternalDistributedSystem system = (InternalDistributedSystem) cache.getDistributedSystem();
-    String pwd = system.getConfig().getRedisPassword();
-    final byte[] pwdB = Coder.stringToBytes(pwd);
-    ServerBootstrap b = new ServerBootstrap();
-    b.group(bossGroup, workerGroup)
-    .channel(socketClass)
-    .childHandler(new ChannelInitializer<SocketChannel>() {
-      @Override
-      public void initChannel(SocketChannel ch) throws Exception {
-        if (logger.fineEnabled())
-          logger.fine("GemFireRedisServer-Connection established with " + ch.remoteAddress());
-        ChannelPipeline p = ch.pipeline();
-        p.addLast(ByteToCommandDecoder.class.getSimpleName(), new ByteToCommandDecoder());
-        p.addLast(ExecutionHandlerContext.class.getSimpleName(), new ExecutionHandlerContext(ch, cache, regionCache, GemFireRedisServer.this, pwdB));
-      }
-    })
-    .option(ChannelOption.SO_REUSEADDR, true)
-    .option(ChannelOption.SO_RCVBUF, getBufferSize())
-    .childOption(ChannelOption.SO_KEEPALIVE, true)
-    .childOption(ChannelOption.CONNECT_TIMEOUT_MILLIS, GemFireRedisServer.connectTimeoutMillis)
-    .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
-
-    // Bind and start to accept incoming connections.
-    ChannelFuture f = b.bind(new InetSocketAddress(getBindAddress(), serverPort)).sync();
-    if (this.logger.infoEnabled()) {
-      String logMessage = "GemFireRedisServer started {" + getBindAddress() + ":" + serverPort + "}, Selector threads: " + this.numSelectorThreads;
-      if (this.singleThreadPerConnection)
-        logMessage += ", One worker thread per connection";
-      else
-        logMessage += ", Worker threads: " + this.numWorkerThreads;
-      this.logger.info(logMessage);
-    }
-    this.serverChannel = f.channel();
-  }
-
-  /**
-   * Takes an entry event and processes it. If the entry denotes that a
-   * {@link RedisDataType#REDIS_LIST} or {@link RedisDataType#REDIS_SORTEDSET}
-   * was created then this function will call the necessary calls to create the 
-   * parameterized queries for those keys.
-   * 
-   * @param event EntryEvent from meta data region
-   */
-  private void afterKeyCreate(EntryEvent<String, RedisDataType> event) {
-    if (event.isOriginRemote()) {
-      final String key = (String) event.getKey();
-      final RedisDataType value = event.getNewValue();
-      if (value != RedisDataType.REDIS_STRING && value != RedisDataType.REDIS_HLL && value != RedisDataType.REDIS_PROTECTED) {
-        try {
-          this.regionCache.createRemoteRegionReferenceLocally(Coder.stringToByteArrayWrapper(key), value);
-        } catch (RegionDestroyedException ignore) { // Region already destroyed, ignore
-        }
-      }
-    }
-  }
-
-  /**
-   * When a key is removed then this function will make sure the associated
-   * queries with the key are also removed from each vm to avoid unnecessary
-   * data retention
-   */
-  private void afterKeyDestroy(EntryEvent<String, RedisDataType> event) {
-    if (event.isOriginRemote()) {
-      final String key = (String) event.getKey();
-      final RedisDataType value = event.getOldValue();
-      if (value != null && value != RedisDataType.REDIS_STRING && value != RedisDataType.REDIS_HLL && value != RedisDataType.REDIS_PROTECTED) {
-        ByteArrayWrapper kW = Coder.stringToByteArrayWrapper(key);
-        Region<?, ?> r = this.regionCache.getRegion(kW);
-        if (r != null) { 
-          this.regionCache.removeRegionReferenceLocally(kW, value);
-        }
-      }
-    }
-  }
-
-  private final class MetaCacheListener extends CacheListenerAdapter<String, RedisDataType> {
-
-    @Override
-    public void afterCreate(EntryEvent<String, RedisDataType> event) {
-      afterKeyCreate(event);
-    }
-
-    @Override
-    public void afterDestroy(EntryEvent<String, RedisDataType> event) {
-      afterKeyDestroy(event);
-    }
-  }
-
-  /**
-   * Helper method to get GemFire set socket buffer size,
-   * possibly a default of 32k
-   * 
-   * @return Buffer size to use for server
-   */
-  private int getBufferSize() {
-    InternalDistributedSystem system = (InternalDistributedSystem) cache.getDistributedSystem();
-    return system.getConfig().getSocketBufferSize();
-  }
-
-  /**
-   * Shutdown method for {@link GemFireRedisServer}. This closes the {@link Cache},
-   * interrupts all execution and forcefully closes all connections.
-   */
-  public synchronized void shutdown() {
-    if (!shutdown) {
-      if (logger.infoEnabled())
-        logger.info("GemFireRedisServer shutting down");
-      ChannelFuture closeFuture = this.serverChannel.closeFuture();
-      Future<?> c = workerGroup.shutdownGracefully();
-      Future<?> c2 = bossGroup.shutdownGracefully();
-      this.serverChannel.close();
-      c.syncUninterruptibly();
-      c2.syncUninterruptibly();
-      this.regionCache.close();
-      if (mainThread != null)
-        mainThread.interrupt();
-      for (ScheduledFuture<?> f : this.expirationFutures.values())
-        f.cancel(true);
-      this.expirationFutures.clear();
-      this.expirationExecutor.shutdownNow();
-      closeFuture.syncUninterruptibly();
-      shutdown = true;
-    }
-  }
-
-  /**
-   * Static main method that allows the {@link GemFireRedisServer} to be 
-   * started from the command line. The supported command line arguments are
-   * <p>-port=
-   * <br>-bind-address=
-   * <br>-log-level=
-   * 
-   * @param args Command line args
-   */
-  public static void main(String[] args) {
-    int port = DEFAULT_REDIS_SERVER_PORT;
-    String bindAddress = null;
-    String logLevel = null;
-    for (String arg: args) {
-      if (arg.startsWith("-port"))
-        port = getPort(arg);
-      else if (arg.startsWith("-bind-address"))
-        bindAddress = getBindAddress(arg);
-      else if (arg.startsWith("-log-level"))
-        logLevel = getLogLevel(arg);
-    }
-    mainThread = Thread.currentThread();
-    GemFireRedisServer server = new GemFireRedisServer(bindAddress, port, logLevel);
-    server.start();
-    while(true) {
-      try {
-        Thread.sleep(Long.MAX_VALUE);
-      } catch (InterruptedException e1) { 
-        break;
-      } catch (Exception e) {
-      }
-    }
-  }
-
-  /**
-   * Helper method to parse the port to a number
-   * 
-   * @param arg String where the argument is
-   * @return The port number when the correct syntax was used, 
-   * otherwise will return {@link #DEFAULT_REDIS_SERVER_PORT}
-   */
-  private static int getPort(String arg) {
-    int port = DEFAULT_REDIS_SERVER_PORT;
-    if (arg != null && arg.length() > 6) {
-      if (arg.startsWith("-port")) {
-        String p = arg.substring(arg.indexOf('=') + 1);
-        p = p.trim();
-        try {
-          port = Integer.parseInt(p);
-        } catch (NumberFormatException e) {
-          System.out.println("Unable to parse port, using default port");
-        }
-      }
-    }
-    return port;
-  }
-
-  /**
-   * Helper method to parse bind address
-   * 
-   * @param arg String holding bind address
-   * @return Bind address
-   */
-  private static String getBindAddress(String arg) {
-    String address = null;
-    if (arg != null && arg.length() > 14) {
-      if (arg.startsWith("-bind-address")) {
-        String p = arg.substring(arg.indexOf('=') + 1);
-        address = p.trim();
-      }
-    }
-    return address;
-  }
-
-  /**
-   * Helper method to parse log level
-   * 
-   * @param arg String holding log level
-   * @return Log level
-   */
-  private static String getLogLevel(String arg) {
-    String logLevel = null;
-    if (arg != null && arg.length() > 11) {
-      if (arg.startsWith("-log-level")) {
-        String p = arg.substring(arg.indexOf('=') + 1);
-        logLevel = p.trim();
-      }
-    }
-    return logLevel;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/dfd481e0/geode-core/src/main/java/org/apache/geode/redis/GeodeRedisServer.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/redis/GeodeRedisServer.java b/geode-core/src/main/java/org/apache/geode/redis/GeodeRedisServer.java
new file mode 100644
index 0000000..e586829
--- /dev/null
+++ b/geode-core/src/main/java/org/apache/geode/redis/GeodeRedisServer.java
@@ -0,0 +1,719 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.geode.redis;
+
+import static com.gemstone.gemfire.distributed.ConfigurationProperties.*;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.UnknownHostException;
+import java.util.Collection;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import com.gemstone.gemfire.LogWriter;
+import com.gemstone.gemfire.cache.Cache;
+import com.gemstone.gemfire.cache.CacheFactory;
+import com.gemstone.gemfire.cache.EntryEvent;
+import com.gemstone.gemfire.cache.Region;
+import com.gemstone.gemfire.cache.RegionDestroyedException;
+import com.gemstone.gemfire.cache.RegionFactory;
+import com.gemstone.gemfire.cache.RegionShortcut;
+import com.gemstone.gemfire.cache.util.CacheListenerAdapter;
+import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
+import com.gemstone.gemfire.internal.SocketCreator;
+import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
+import com.gemstone.gemfire.internal.hll.HyperLogLogPlus;
+
+import org.apache.geode.redis.internal.ByteArrayWrapper;
+import org.apache.geode.redis.internal.ByteToCommandDecoder;
+import org.apache.geode.redis.internal.Coder;
+import org.apache.geode.redis.internal.ExecutionHandlerContext;
+import org.apache.geode.redis.internal.RedisDataType;
+import org.apache.geode.redis.internal.RegionProvider;
+
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.ServerChannel;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.oio.OioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.channel.socket.oio.OioServerSocketChannel;
+import io.netty.util.concurrent.Future;
+
+/**
+ * The GeodeRedisServer is a server that understands the Redis protocol. As
+ * commands are sent to the server, each command is picked up by a thread,
+ * interpreted and then executed and a response is sent back to the client. The
+ * default connection port is 6379 but that can be altered when run through GFSH
+ * or started through the provided static main class.
+ * <p>
+ * Each Redis data type instance is stored in a separate {@link Region} except
+ * for the Strings and HyperLogLogs which are collectively stored in one Region
+ * respectively. That Region along with a meta data region used internally are 
+ * protected so the client may not store keys with the name {@link GeodeRedisServer#REDIS_META_DATA_REGION}
+ * or {@link GeodeRedisServer#STRING_REGION}. The default Region type is
+ * {@link RegionShortcut#PARTITION} although this can be changed by specifying the
+ * SystemProperty {@value #DEFAULT_REGION_SYS_PROP_NAME} to a type defined by {@link RegionShortcut}.
+ * If the {@link GeodeRedisServer#NUM_THREADS_SYS_PROP_NAME} system property is set to 0,
+ * one thread per client will be created. Otherwise a worker thread pool of specified size is
+ * used or a default size of 4 * {@link Runtime#availableProcessors()} if the property is not set.
+ * <p>
+ * Setting the AUTH password requires setting the property "redis-password" just as "redis-port"
+ * would be in xml or through GFSH.
+ * <p>
+ * The supported commands are as follows:
+ * <p>
+ * Supported String commands - APPEND, BITCOUNT, BITOP, BITPOS, DECR, DECRBY, 
+ * GET, GETBIT, GETRANGE, GETSET, INCR, INCRBY, INCRBYFLOAT, MGET, MSET, MSETNX,
+ * PSETEX, SET, SETBIT, SETEX, SETNX, STRLEN
+ * <p>
+ * Supported List commands - LINDEX, LLEN, LPOP, LPUSH, LPUSHX, LRANGE,
+ * LREM, LSET, LTRIM, RPOP, RPUSH, RPUSHX
+ * <p>
+ * Supported Hash commands - HDEL, HEXISTS, HGET, HGETALL, HINCRBY, HINCRBYFLOAT,
+ * HKEYS, HMGET, HMSET, HSETNX, HLEN, HSCAN, HSET, HVALS
+ * <p>
+ * Supported Set commands - SADD, SCARD, SDIFF, SDIFFSTORE, SINTER,
+ * SINTERSTORE, SISMEMBER, SMEMBERS, SMOVE, SREM, SPOP, SRANDMEMBER, 
+ * SCAN, SUNION, SUNIONSTORE
+ * <p>
+ * Supported SortedSet commands - ZADD, ZCARD, ZCOUNT, ZINCRBY, ZLEXCOUNT,
+ * ZRANGE, ZRANGEBYLEX, ZRANGEBYSCORE, ZRANK, ZREM, ZREMRANGEBYLEX,
+ * ZREMRANGEBYRANK, ZREMRANGEBYSCORE, ZREVRANGE, ZREVRANGEBYSCORE, ZREVRANK,
+ * ZSCAN, ZSCORE
+ * <p>
+ * Supported HyperLogLog commands - PFADD, PFCOUNT, PFMERGE
+ * <p>
+ * Supported Keys commands - DEL, DBSIZE, EXISTS, EXPIRE, EXPIREAT, FLUSHDB, FLUSHALL, 
+ * KEYS, PERSIST, PEXPIRE, PEXPIREAT, PTTL, SCAN, TTL
+ * <p>
+ * Supported Transaction commands - DISCARD, EXEC, MULTI
+ * <P>
+ * Supported Server commands - AUTH, ECHO, PING, TIME, QUIT
+ * <p>
+ * <p>
+ * The command executors are not explicitly documented but the functionality
+ * can be found at <a href="http://redis.io/commands">Redis Commands</a>
+ * <p>
+ * Exceptions to the Redis Commands Documents:<p>
+ * <ul>
+ * <li>Any command that removes keys and returns a count of removed
+ * entries will not return a total remove count but rather a count of how
+ * many entries have been removed that existed on the local vm, though 
+ * all entries will be removed</li>
+ * <li>Any command that returns a count of newly set members has an
+ * unspecified return value. The command will work just as the Redis protocol
+ * states but the count will not necessary reflect the number set compared
+ * to overridden.</li>
+ * <li>Transactions work just as they would on a Redis instance, they are local
+ * transaction. Transactions cannot be executed on data that is not local to the 
+ * executing server, that is on a partitioned region in a different server 
+ * instance or on a persistent region that does not have transactions enabled.
+ * Also, you cannot watch or unwatch keys as all keys within a GemFire 
+ * transaction are watched by default.</li>
+ * </ul>
+ *
+ */
+
+public class GeodeRedisServer {
+
+  /**
+   * Thread used to start main method
+   */
+  private static Thread mainThread = null;
+
+  /**
+   * The default Redis port as specified by their protocol, {@value #DEFAULT_REDIS_SERVER_PORT}
+   */
+  public static final int DEFAULT_REDIS_SERVER_PORT = 6379;
+
+  /**
+   * The number of threads that will work on handling requests
+   */
+  private final int numWorkerThreads;
+
+  /**
+   * The number of threads that will work socket selectors
+   */
+  private final int numSelectorThreads;
+
+  /**
+   * The actual port being used by the server
+   */
+  private final int serverPort;
+
+  /**
+   * The address to bind to
+   */
+  private final String bindAddress;
+
+  /**
+   * Connection timeout in milliseconds
+   */
+  private static final int connectTimeoutMillis = 1000;
+
+  /**
+   * Temporary constant whether to use old single thread per connection
+   * model for worker group
+   */
+  private boolean singleThreadPerConnection;
+
+  /**
+   * Logging level
+   */
+  private final String logLevel;
+
+  /**
+   * The cache instance pointer on this vm
+   */
+  private Cache cache;
+
+  /**
+   * Channel to be closed when shutting down
+   */
+  private Channel serverChannel;
+
+  /**
+   * Gem logwriter
+   */
+  private LogWriter logger;
+
+  private RegionProvider regionCache;
+
+  private final MetaCacheListener metaListener;
+
+  private EventLoopGroup bossGroup;
+  private EventLoopGroup workerGroup;
+  private final static int numExpirationThreads = 1;
+  private final ScheduledExecutorService expirationExecutor;
+
+  /**
+   * Map of futures to be executed for key expirations
+   */
+  private final ConcurrentMap<ByteArrayWrapper, ScheduledFuture<?>> expirationFutures;
+
+
+  /**
+   * The field that defines the name of the {@link Region} which holds all of
+   * the strings. The current value of this field is {@value #STRING_REGION}.
+   */
+  public static final String STRING_REGION = "__StRiNgS";
+
+  /**
+   * The field that defines the name of the {@link Region} which holds all of
+   * the HyperLogLogs. The current value of this field is {@value #HLL_REGION}.
+   */
+  public static final String HLL_REGION = "__HlL";
+
+  /**
+   * The field that defines the name of the {@link Region} which holds all of
+   * the Redis meta data. The current value of this field is {@value #REDIS_META_DATA_REGION}.
+   */
+  public static final String REDIS_META_DATA_REGION = "__ReDiS_MeTa_DaTa";
+
+  /**
+   * The system property name used to set the default {@link Region} creation
+   * type. The property name is {@value #DEFAULT_REGION_SYS_PROP_NAME} and the
+   * acceptable values are types defined by {@link RegionShortcut}, 
+   * i.e. "PARTITION" would be used for {@link RegionShortcut#PARTITION}.
+   */
+  public static final String DEFAULT_REGION_SYS_PROP_NAME = "gemfireredis.regiontype";
+
+  /**
+   * System property name that can be used to set the number of threads to be
+   * used by the GeodeRedisServer
+   */
+  public static final String NUM_THREADS_SYS_PROP_NAME = "gemfireredis.numthreads";
+
+  /**
+   * The actual {@link RegionShortcut} type specified by the system property
+   * {@value #DEFAULT_REGION_SYS_PROP_NAME}.
+   */
+  public final RegionShortcut DEFAULT_REGION_TYPE;
+
+  private boolean shutdown;
+  private boolean started;
+
+  /**
+   * Determine the {@link RegionShortcut} type from a String value.
+   * If the String value doesn't map to a RegionShortcut type then 
+   * {@link RegionShortcut#PARTITION} will be used by default.
+   * 
+   * @return {@link RegionShortcut}
+   */
+  private static RegionShortcut setRegionType() {
+    String regionType = System.getProperty(DEFAULT_REGION_SYS_PROP_NAME, "PARTITION");
+    RegionShortcut type;
+    try {
+      type = RegionShortcut.valueOf(regionType);
+    } catch (Exception e) {
+      type = RegionShortcut.PARTITION;
+    }
+    return type;
+  }
+
+  /**
+   * Helper method to set the number of worker threads
+   * 
+   * @return If the System property {@value #NUM_THREADS_SYS_PROP_NAME} is set then that number
+   * is used, otherwise 4 * # of cores
+   */
+  private int setNumWorkerThreads() {
+    String prop = System.getProperty(NUM_THREADS_SYS_PROP_NAME);
+    int numCores = Runtime.getRuntime().availableProcessors();
+    int def = 4 * numCores;
+    if (prop == null || prop.isEmpty())
+      return def;
+    int threads;
+    try {
+      threads = Integer.parseInt(prop);
+    } catch (NumberFormatException e) {
+      return def;
+    }
+    return threads;
+  }
+
+  /**
+   * Constructor for {@link GeodeRedisServer} that will start the
+   * server on the given port and bind to the first non-loopback address
+   * 
+   * @param port The port the server will bind to, will use {@value #DEFAULT_REDIS_SERVER_PORT} by default
+   */
+  public GeodeRedisServer(int port) {
+    this(null, port, null);
+  }
+
+  /**
+   * Constructor for {@link GeodeRedisServer} that will start the
+   * server and bind to the given address and port
+   * 
+   * @param bindAddress The address to which the server will attempt to bind to
+   * @param port The port the server will bind to, will use {@value #DEFAULT_REDIS_SERVER_PORT} by default if argument is less than or equal to 0
+   */
+  public GeodeRedisServer(String bindAddress, int port) {
+    this(bindAddress, port, null);
+  }
+
+
+  /**
+   * Constructor for {@link GeodeRedisServer} that will start the
+   * server and bind to the given address and port. Keep in mind that the
+   * log level configuration will only be set if a {@link Cache} does not already
+   * exist, if one already exists then setting that property will have no effect.
+   * 
+   * @param bindAddress The address to which the server will attempt to bind to
+   * @param port The port the server will bind to, will use {@value #DEFAULT_REDIS_SERVER_PORT} by default if argument is less than or equal to 0
+   * @param logLevel The logging level to be used by GemFire
+   */
+  public GeodeRedisServer(String bindAddress, int port, String logLevel) {
+    if (port <= 0)
+      this.serverPort = DEFAULT_REDIS_SERVER_PORT;
+    else
+      this.serverPort = port;
+    this.bindAddress = bindAddress;
+    this.logLevel = logLevel;
+    this.numWorkerThreads = setNumWorkerThreads();
+    if (this.numWorkerThreads == 0)
+      this.singleThreadPerConnection = true;
+    this.numSelectorThreads = 1;
+    this.metaListener = new MetaCacheListener();
+    this.expirationFutures = new ConcurrentHashMap<ByteArrayWrapper, ScheduledFuture<?>>();
+    this.expirationExecutor = Executors.newScheduledThreadPool(numExpirationThreads, new ThreadFactory() {
+      private final AtomicInteger counter = new AtomicInteger();
+      @Override
+      public Thread newThread(Runnable r) {
+        Thread t = new Thread(r);
+        t.setName("GemFireRedis-ScheduledExecutor-" + counter.incrementAndGet());
+        t.setDaemon(true);
+        return t;
+      }
+
+    });
+    this.DEFAULT_REGION_TYPE = setRegionType();
+    this.shutdown = false;
+    this.started = false;
+  }
+
+  /**
+   * Helper method to get the host name to bind to
+   * 
+   * @return The InetAddress to bind to
+   * @throws UnknownHostException
+   */
+  private InetAddress getBindAddress() throws UnknownHostException {
+    return this.bindAddress == null || this.bindAddress.isEmpty()
+        ? SocketCreator.getLocalHost()
+            : InetAddress.getByName(this.bindAddress);
+  }
+
+  /**
+   * This is function to call on a {@link GeodeRedisServer} instance
+   * to start it running
+   */
+  public synchronized void start() {
+    if (!started) {
+      try {
+        startGemFire();
+        initializeRedis();
+        startRedisServer();
+      } catch (IOException e) {
+        throw new RuntimeException("Could not start Server", e);
+      } catch (InterruptedException e) {
+        throw new RuntimeException("Could not start Server", e);
+      }
+      started = true;
+    }
+  }
+
+  /**
+   * Initializes the {@link Cache}, and creates Redis necessities
+   * Region and protects declares that {@link Region} to be protected. 
+   * Also, every {@link GeodeRedisServer} will check for entries already in the
+   * meta data Region.
+   */
+  private void startGemFire() {
+    Cache c = GemFireCacheImpl.getInstance();
+    if (c == null) {
+      synchronized (GeodeRedisServer.class) {
+        c = GemFireCacheImpl.getInstance();
+        if (c == null) {
+          CacheFactory cacheFactory = new CacheFactory();
+          if (logLevel != null)
+            cacheFactory.set(LOG_LEVEL, logLevel);
+          c = cacheFactory.create();
+        }
+      }
+    }
+    this.cache = c;
+    this.logger = c.getLogger();
+  }
+
+  private void initializeRedis() {
+    synchronized (this.cache) {
+      RegionFactory<String, RedisDataType> rfMeta = cache.createRegionFactory(RegionShortcut.REPLICATE);
+      rfMeta.addCacheListener(this.metaListener);
+      RegionFactory<ByteArrayWrapper, ByteArrayWrapper> rfString = cache.createRegionFactory(DEFAULT_REGION_TYPE);
+      RegionFactory<ByteArrayWrapper, HyperLogLogPlus> rfHLL = cache.createRegionFactory(DEFAULT_REGION_TYPE);
+      Region<ByteArrayWrapper, ByteArrayWrapper> stringsRegion;
+      if ((stringsRegion = this.cache.getRegion(STRING_REGION)) == null)
+        stringsRegion = rfString.create(GeodeRedisServer.STRING_REGION);
+      Region<ByteArrayWrapper, HyperLogLogPlus> hLLRegion;
+      if ((hLLRegion = this.cache.getRegion(HLL_REGION)) == null)
+        hLLRegion = rfHLL.create(HLL_REGION);
+      Region<String, RedisDataType> redisMetaData;
+      if ((redisMetaData = this.cache.getRegion(REDIS_META_DATA_REGION)) == null)
+        redisMetaData = rfMeta.create(REDIS_META_DATA_REGION);
+      this.regionCache = new RegionProvider(stringsRegion, hLLRegion, redisMetaData, expirationFutures, expirationExecutor, this.DEFAULT_REGION_TYPE);
+      redisMetaData.put(REDIS_META_DATA_REGION, RedisDataType.REDIS_PROTECTED);
+      redisMetaData.put(HLL_REGION, RedisDataType.REDIS_PROTECTED);
+      redisMetaData.put(STRING_REGION, RedisDataType.REDIS_PROTECTED);
+    }
+    checkForRegions();
+  }
+
+  private void checkForRegions() {
+    Collection<Entry<String, RedisDataType>> entrySet = this.regionCache.metaEntrySet();
+    for (Entry<String, RedisDataType> entry: entrySet) {
+      String regionName = entry.getKey();
+      RedisDataType type = entry.getValue();
+      Region<?, ?> newRegion = cache.getRegion(regionName);
+      if (newRegion == null && type != RedisDataType.REDIS_STRING && type != RedisDataType.REDIS_HLL && type != RedisDataType.REDIS_PROTECTED) {
+        try {
+          this.regionCache.createRemoteRegionReferenceLocally(Coder.stringToByteArrayWrapper(regionName), type);
+        } catch (Exception e) {
+          if (logger.errorEnabled())
+            logger.error(e);
+        }
+      }
+    }
+  }
+
+  /**
+   * Helper method to start the server listening for connections. The
+   * server is bound to the port specified by {@link GeodeRedisServer#serverPort}
+   * 
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  private void startRedisServer() throws IOException, InterruptedException {
+    ThreadFactory selectorThreadFactory = new ThreadFactory() {
+      private final AtomicInteger counter = new AtomicInteger();
+      @Override
+      public Thread newThread(Runnable r) {
+        Thread t = new Thread(r);
+        t.setName("GeodeRedisServer-SelectorThread-" + counter.incrementAndGet());
+        t.setDaemon(true);
+        return t;
+      }
+
+    };
+
+    ThreadFactory workerThreadFactory = new ThreadFactory() {
+      private final AtomicInteger counter = new AtomicInteger();
+      @Override
+      public Thread newThread(Runnable r) {
+        Thread t = new Thread(r);
+        t.setName("GeodeRedisServer-WorkerThread-" + counter.incrementAndGet());
+        return t;
+      }
+
+    };
+
+    bossGroup = null;
+    workerGroup = null;
+    Class<? extends ServerChannel> socketClass = null;
+    if (singleThreadPerConnection) {
+      bossGroup = new OioEventLoopGroup(Integer.MAX_VALUE, selectorThreadFactory);
+      workerGroup = new OioEventLoopGroup(Integer.MAX_VALUE, workerThreadFactory);
+      socketClass = OioServerSocketChannel.class;
+    } else {
+      bossGroup = new NioEventLoopGroup(this.numSelectorThreads, selectorThreadFactory);
+      workerGroup = new NioEventLoopGroup(this.numWorkerThreads, workerThreadFactory);
+      socketClass = NioServerSocketChannel.class;
+    }
+    InternalDistributedSystem system = (InternalDistributedSystem) cache.getDistributedSystem();
+    String pwd = system.getConfig().getRedisPassword();
+    final byte[] pwdB = Coder.stringToBytes(pwd);
+    ServerBootstrap b = new ServerBootstrap();
+    b.group(bossGroup, workerGroup)
+    .channel(socketClass)
+    .childHandler(new ChannelInitializer<SocketChannel>() {
+      @Override
+      public void initChannel(SocketChannel ch) throws Exception {
+        if (logger.fineEnabled())
+          logger.fine("GeodeRedisServer-Connection established with " + ch.remoteAddress());
+        ChannelPipeline p = ch.pipeline();
+        p.addLast(ByteToCommandDecoder.class.getSimpleName(), new ByteToCommandDecoder());
+        p.addLast(ExecutionHandlerContext.class.getSimpleName(), new ExecutionHandlerContext(ch, cache, regionCache, GeodeRedisServer.this, pwdB));
+      }
+    })
+    .option(ChannelOption.SO_REUSEADDR, true)
+    .option(ChannelOption.SO_RCVBUF, getBufferSize())
+    .childOption(ChannelOption.SO_KEEPALIVE, true)
+    .childOption(ChannelOption.CONNECT_TIMEOUT_MILLIS, GeodeRedisServer.connectTimeoutMillis)
+    .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
+
+    // Bind and start to accept incoming connections.
+    ChannelFuture f = b.bind(new InetSocketAddress(getBindAddress(), serverPort)).sync();
+    if (this.logger.infoEnabled()) {
+      String logMessage = "GeodeRedisServer started {" + getBindAddress() + ":" + serverPort + "}, Selector threads: " + this.numSelectorThreads;
+      if (this.singleThreadPerConnection)
+        logMessage += ", One worker thread per connection";
+      else
+        logMessage += ", Worker threads: " + this.numWorkerThreads;
+      this.logger.info(logMessage);
+    }
+    this.serverChannel = f.channel();
+  }
+
+  /**
+   * Takes an entry event and processes it. If the entry denotes that a
+   * {@link RedisDataType#REDIS_LIST} or {@link RedisDataType#REDIS_SORTEDSET}
+   * was created then this function will call the necessary calls to create the 
+   * parameterized queries for those keys.
+   * 
+   * @param event EntryEvent from meta data region
+   */
+  private void afterKeyCreate(EntryEvent<String, RedisDataType> event) {
+    if (event.isOriginRemote()) {
+      final String key = (String) event.getKey();
+      final RedisDataType value = event.getNewValue();
+      if (value != RedisDataType.REDIS_STRING && value != RedisDataType.REDIS_HLL && value != RedisDataType.REDIS_PROTECTED) {
+        try {
+          this.regionCache.createRemoteRegionReferenceLocally(Coder.stringToByteArrayWrapper(key), value);
+        } catch (RegionDestroyedException ignore) { // Region already destroyed, ignore
+        }
+      }
+    }
+  }
+
+  /**
+   * When a key is removed then this function will make sure the associated
+   * queries with the key are also removed from each vm to avoid unnecessary
+   * data retention
+   */
+  private void afterKeyDestroy(EntryEvent<String, RedisDataType> event) {
+    if (event.isOriginRemote()) {
+      final String key = (String) event.getKey();
+      final RedisDataType value = event.getOldValue();
+      if (value != null && value != RedisDataType.REDIS_STRING && value != RedisDataType.REDIS_HLL && value != RedisDataType.REDIS_PROTECTED) {
+        ByteArrayWrapper kW = Coder.stringToByteArrayWrapper(key);
+        Region<?, ?> r = this.regionCache.getRegion(kW);
+        if (r != null) { 
+          this.regionCache.removeRegionReferenceLocally(kW, value);
+        }
+      }
+    }
+  }
+
+  private final class MetaCacheListener extends CacheListenerAdapter<String, RedisDataType> {
+
+    @Override
+    public void afterCreate(EntryEvent<String, RedisDataType> event) {
+      afterKeyCreate(event);
+    }
+
+    @Override
+    public void afterDestroy(EntryEvent<String, RedisDataType> event) {
+      afterKeyDestroy(event);
+    }
+  }
+
+  /**
+   * Helper method to get GemFire set socket buffer size,
+   * possibly a default of 32k
+   * 
+   * @return Buffer size to use for server
+   */
+  private int getBufferSize() {
+    InternalDistributedSystem system = (InternalDistributedSystem) cache.getDistributedSystem();
+    return system.getConfig().getSocketBufferSize();
+  }
+
+  /**
+   * Shutdown method for {@link GeodeRedisServer}. This closes the {@link Cache},
+   * interrupts all execution and forcefully closes all connections.
+   */
+  public synchronized void shutdown() {
+    if (!shutdown) {
+      if (logger.infoEnabled())
+        logger.info("GeodeRedisServer shutting down");
+      ChannelFuture closeFuture = this.serverChannel.closeFuture();
+      Future<?> c = workerGroup.shutdownGracefully();
+      Future<?> c2 = bossGroup.shutdownGracefully();
+      this.serverChannel.close();
+      c.syncUninterruptibly();
+      c2.syncUninterruptibly();
+      this.regionCache.close();
+      if (mainThread != null)
+        mainThread.interrupt();
+      for (ScheduledFuture<?> f : this.expirationFutures.values())
+        f.cancel(true);
+      this.expirationFutures.clear();
+      this.expirationExecutor.shutdownNow();
+      closeFuture.syncUninterruptibly();
+      shutdown = true;
+    }
+  }
+
+  /**
+   * Static main method that allows the {@link GeodeRedisServer} to be
+   * started from the command line. The supported command line arguments are
+   * <p>-port=
+   * <br>-bind-address=
+   * <br>-log-level=
+   * 
+   * @param args Command line args
+   */
+  public static void main(String[] args) {
+    int port = DEFAULT_REDIS_SERVER_PORT;
+    String bindAddress = null;
+    String logLevel = null;
+    for (String arg: args) {
+      if (arg.startsWith("-port"))
+        port = getPort(arg);
+      else if (arg.startsWith("-bind-address"))
+        bindAddress = getBindAddress(arg);
+      else if (arg.startsWith("-log-level"))
+        logLevel = getLogLevel(arg);
+    }
+    mainThread = Thread.currentThread();
+    GeodeRedisServer server = new GeodeRedisServer(bindAddress, port, logLevel);
+    server.start();
+    while(true) {
+      try {
+        Thread.sleep(Long.MAX_VALUE);
+      } catch (InterruptedException e1) { 
+        break;
+      } catch (Exception e) {
+      }
+    }
+  }
+
+  /**
+   * Helper method to parse the port to a number
+   * 
+   * @param arg String where the argument is
+   * @return The port number when the correct syntax was used, 
+   * otherwise will return {@link #DEFAULT_REDIS_SERVER_PORT}
+   */
+  private static int getPort(String arg) {
+    int port = DEFAULT_REDIS_SERVER_PORT;
+    if (arg != null && arg.length() > 6) {
+      if (arg.startsWith("-port")) {
+        String p = arg.substring(arg.indexOf('=') + 1);
+        p = p.trim();
+        try {
+          port = Integer.parseInt(p);
+        } catch (NumberFormatException e) {
+          System.out.println("Unable to parse port, using default port");
+        }
+      }
+    }
+    return port;
+  }
+
+  /**
+   * Helper method to parse bind address
+   * 
+   * @param arg String holding bind address
+   * @return Bind address
+   */
+  private static String getBindAddress(String arg) {
+    String address = null;
+    if (arg != null && arg.length() > 14) {
+      if (arg.startsWith("-bind-address")) {
+        String p = arg.substring(arg.indexOf('=') + 1);
+        address = p.trim();
+      }
+    }
+    return address;
+  }
+
+  /**
+   * Helper method to parse log level
+   * 
+   * @param arg String holding log level
+   * @return Log level
+   */
+  private static String getLogLevel(String arg) {
+    String logLevel = null;
+    if (arg != null && arg.length() > 11) {
+      if (arg.startsWith("-log-level")) {
+        String p = arg.substring(arg.indexOf('=') + 1);
+        logLevel = p.trim();
+      }
+    }
+    return logLevel;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/dfd481e0/geode-core/src/main/java/org/apache/geode/redis/internal/ByteArrayWrapper.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/redis/internal/ByteArrayWrapper.java b/geode-core/src/main/java/org/apache/geode/redis/internal/ByteArrayWrapper.java
new file mode 100755
index 0000000..f991914
--- /dev/null
+++ b/geode-core/src/main/java/org/apache/geode/redis/internal/ByteArrayWrapper.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.geode.redis.internal;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Arrays;
+
+import com.gemstone.gemfire.DataSerializable;
+import com.gemstone.gemfire.DataSerializer;
+
+/**
+ * This class is a wrapper for the any Regions that need to store a 
+ * byte[]. The only data this an instance will store is a byte[]
+ * for the data but it is also serializable and comparable so it is able to be used
+ * in querying
+ * 
+ *
+ */
+public class ByteArrayWrapper implements DataSerializable, Comparable<ByteArrayWrapper> {
+  /**
+   * Generated serialVerionUID
+   */
+  private static final long serialVersionUID = 9066391742266642992L;
+
+  /**
+   * The data portion of ValueWrapper
+   */
+  private byte[] value;
+
+  /**
+   * Hash of {@link #value}, this value is cached for performance
+   */
+  private transient int hashCode;
+
+  private transient String toString;
+
+  /**
+   * Empty constructor for serialization
+   */
+  public ByteArrayWrapper() {
+  }
+
+  /**
+   * Default constructor constructs a ValueWrapper
+   * and initialize the {@link #value}
+   * 
+   * @param value
+   */
+  public ByteArrayWrapper(byte[] value) {
+    this.value = value;
+    this.hashCode = Arrays.hashCode(value);
+  }
+
+  @Override
+  public void toData(DataOutput out) throws IOException {
+    DataSerializer.writeByteArray(value, out);
+  }
+
+  @Override
+  public void fromData(DataInput in) throws IOException, ClassNotFoundException {
+    this.value = DataSerializer.readByteArray(in);;
+    this.hashCode = Arrays.hashCode(this.value);
+  }
+
+  @Override
+  public String toString() {
+    if (toString == null)
+      toString = Coder.bytesToString(this.value);
+    return toString;
+  }
+
+  public byte[] toBytes() {
+    return this.value;
+  }
+
+  public void setBytes(byte[] bytes) {
+    this.value = bytes;
+    this.toString = null;
+    this.hashCode = Arrays.hashCode(bytes);
+  }
+
+  /**
+   * Getter for the length of the {@link #value} array
+   * @return The length of the value array
+   */
+  public int length() {
+    return value.length;
+  }
+
+  /**
+   * Hash code for byte[] wrapped by this object,
+   * the actual hashcode is determined by Arrays.hashCode(byte[])
+   */
+  @Override
+  public int hashCode() {
+    return this.hashCode;
+  }
+
+
+  /**
+   * This equals is neither symmetric and therefore not transitive, 
+   * because a String with the same underlying bytes is considered
+   * equal. Clearly calling {@link String#equals(Object)} would not 
+   * yield the same result
+   */
+  @Override
+  public boolean equals(Object other) {
+    if (other instanceof ByteArrayWrapper)
+      return Arrays.equals(value, ((ByteArrayWrapper) other).value);
+    else if (other instanceof String) {
+      return Arrays.equals(value, Coder.stringToBytes((String) other));
+    }
+    return false;
+  }
+
+  /**
+   * This is a byte to byte comparator, it is not lexicographical but purely compares
+   * byte by byte values
+   */
+  @Override
+  public int compareTo(ByteArrayWrapper other) {
+    return arrayCmp(value, other.value);
+
+  }
+
+  /**
+   * Private helper method to compare two byte arrays, A.compareTo(B). The comparison
+   * is basically numerical, for each byte index, the byte representing the greater
+   * value will be the greater
+   * 
+   * @param A byte[]
+   * @param B byte[]
+   * @return 1 if A > B, -1 if B > A, 0 if A == B
+   */
+  private int arrayCmp(byte[] A, byte[] B) {
+    if (A == B)
+      return 0;
+    if (A == null) {
+      return -1;
+    } else if (B == null) {
+      return 1;
+    }
+
+    int len = Math.min(A.length, B.length);
+
+    for (int i = 0; i < len; i++) {
+      byte a = A[i];
+      byte b = B[i];
+      int diff = a - b;
+      if (diff > 0)
+        return 1;
+      else if (diff < 0)
+        return -1;
+    }
+
+    if (A.length > B.length)
+      return 1;
+    else if (B.length > A.length)
+      return -1;
+
+    return 0;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/dfd481e0/geode-core/src/main/java/org/apache/geode/redis/internal/ByteToCommandDecoder.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/redis/internal/ByteToCommandDecoder.java b/geode-core/src/main/java/org/apache/geode/redis/internal/ByteToCommandDecoder.java
new file mode 100644
index 0000000..5a93065
--- /dev/null
+++ b/geode-core/src/main/java/org/apache/geode/redis/internal/ByteToCommandDecoder.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.geode.redis.internal;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.ByteToMessageDecoder;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * This is the first part of the channel pipeline for Netty. Here incoming
+ * bytes are read and a created {@link Command} is sent down the pipeline.
+ * It is unfortunate that this class is not {@link io.netty.channel.ChannelHandler.Sharable} because no state
+ * is kept in this class. State is kept by {@link ByteToMessageDecoder}, it may
+ * be worthwhile to look at a different decoder setup as to avoid allocating a decoder
+ * for every new connection.
+ * <p>
+ * The code flow of the protocol parsing may not be exactly Java like, but this is done 
+ * very intentionally. It was found that in cases where large Redis requests are sent
+ * that end up being fragmented, throwing exceptions when the command could not be fully
+ * parsed took up an enormous amount of cpu time. The simplicity of the Redis protocol
+ * allows us to just back out and wait for more data, while exceptions are left to 
+ * malformed requests which should never happen if using a proper Redis client.
+ * 
+ *
+ */
+public class ByteToCommandDecoder extends ByteToMessageDecoder {
+
+  /**
+   * Important note
+   * 
+   * Do not use '' <-- java primitive chars. Redis uses {@link Coder#CHARSET}
+   * encoding so we should not risk java handling char to byte conversions, rather 
+   * just hard code {@link Coder#CHARSET} chars as bytes
+   */
+  
+  private static final byte rID = 13; // '\r';
+  private static final byte nID = 10; // '\n';
+  private static final byte bulkStringID = 36; // '$';
+  private static final byte arrayID = 42; // '*';
+  private static final int MAX_BULK_STRING_LENGTH = 512 * 1024 * 1024; // 512 MB
+  
+  public ByteToCommandDecoder() {
+  }
+
+  @Override
+  protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
+    Command c = null;
+    do {
+      in.markReaderIndex();
+      c = parse(in);
+      if (c == null) {
+        in.resetReaderIndex();
+        return;
+      }
+      out.add(c);
+    } while (in.isReadable()); // Try to take advantage of pipelining if it is being used
+  }
+
+  private Command parse(ByteBuf buffer) throws RedisCommandParserException {
+    if (buffer == null)
+      throw new NullPointerException();
+    if (!buffer.isReadable())
+      return null;
+
+    byte firstB = buffer.readByte();
+    if (firstB != arrayID)
+      throw new RedisCommandParserException("Expected: " + (char) arrayID + " Actual: " + (char) firstB);
+    ArrayList<byte[]> commandElems = new ArrayList<byte[]>();
+
+    if (!parseArray(commandElems, buffer))
+      return null;
+
+    return new Command(commandElems);
+  }
+
+  private boolean parseArray(ArrayList<byte[]> commandElems, ByteBuf buffer) throws RedisCommandParserException { 
+    byte currentChar;
+    int arrayLength = parseCurrentNumber(buffer);
+    if (arrayLength == Integer.MIN_VALUE || !parseRN(buffer))
+      return false;
+    if (arrayLength < 0 || arrayLength > 1000000000)
+      throw new RedisCommandParserException("invalid multibulk length");
+
+    for (int i = 0; i < arrayLength; i++) {
+      if (!buffer.isReadable())
+        return false;
+      currentChar = buffer.readByte();
+      if (currentChar == bulkStringID) {
+        byte[] newBulkString = parseBulkString(buffer);
+        if (newBulkString == null)
+          return false;
+        commandElems.add(newBulkString);
+      } else
+        throw new RedisCommandParserException("expected: \'$\', got \'" + (char) currentChar + "\'");
+    }
+    return true;
+  }
+
+  /**
+   * Helper method to parse a bulk string when one is seen
+   * 
+   * @param buffer Buffer to read from
+   * @return byte[] representation of the Bulk String read
+   * @throws RedisCommandParserException Thrown when there is illegal syntax
+   */
+  private byte[] parseBulkString(ByteBuf buffer) throws RedisCommandParserException {
+    int bulkStringLength = parseCurrentNumber(buffer);
+    if (bulkStringLength == Integer.MIN_VALUE)
+      return null;
+    if (bulkStringLength > MAX_BULK_STRING_LENGTH)
+      throw new RedisCommandParserException("invalid bulk length, cannot exceed max length of " + MAX_BULK_STRING_LENGTH);
+    if (!parseRN(buffer))
+      return null;
+
+    if (!buffer.isReadable(bulkStringLength))
+      return null;
+    byte[] bulkString = new byte[bulkStringLength];
+    buffer.readBytes(bulkString);
+
+    if (!parseRN(buffer))
+      return null;
+
+    return bulkString;
+  }
+
+  /**
+   * Helper method to parse the number at the beginning of the buffer
+   * 
+   * @param buffer Buffer to read
+   * @return The number found at the beginning of the buffer
+   */
+  private int parseCurrentNumber(ByteBuf buffer) {
+    int number = 0;
+    int readerIndex = buffer.readerIndex();
+    byte b = 0;
+    while (true) {
+      if (!buffer.isReadable())
+        return Integer.MIN_VALUE;
+      b = buffer.readByte();
+      if (Character.isDigit(b)) {
+        number = number * 10 + (int) (b - '0');
+        readerIndex++;
+      } else {
+        buffer.readerIndex(readerIndex);
+        break;
+      }
+    }
+    return number;
+  }
+
+  /**
+   * Helper method that is called when the next characters are 
+   * supposed to be "\r\n"
+   * 
+   * @param buffer Buffer to read from
+   * @throws RedisCommandParserException Thrown when the next two characters
+   * are not "\r\n"
+   */
+  private boolean parseRN(ByteBuf buffer) throws RedisCommandParserException {
+    if (!buffer.isReadable(2))
+      return false;
+    byte b = buffer.readByte();
+    if (b != rID)
+      throw new RedisCommandParserException("expected \'" + (char) rID + "\', got \'" + (char) b + "\'");
+    b = buffer.readByte();
+    if (b != nID)
+      throw new RedisCommandParserException("expected: \'" + (char) nID + "\', got \'" + (char) b + "\'");
+    return true;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/dfd481e0/geode-core/src/main/java/org/apache/geode/redis/internal/Coder.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/redis/internal/Coder.java b/geode-core/src/main/java/org/apache/geode/redis/internal/Coder.java
new file mode 100644
index 0000000..94b6dd5
--- /dev/null
+++ b/geode-core/src/main/java/org/apache/geode/redis/internal/Coder.java
@@ -0,0 +1,516 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.geode.redis.internal;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+
+import java.io.UnsupportedEncodingException;
+import java.text.DecimalFormat;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import com.gemstone.gemfire.cache.EntryDestroyedException;
+import com.gemstone.gemfire.cache.query.Struct;
+
+/**
+ * This is a safe encoder and decoder for all redis matching needs
+ * 
+ *
+ */
+public class Coder {
+
+  /*
+   * Take no chances on char to byte conversions with default charsets on jvms, 
+   * so we'll hard code the UTF-8 symbol values as bytes here
+   */
+
+
+  /**
+   * byte identifier of a bulk string
+   */
+  public static final byte BULK_STRING_ID = 36; // '$'
+
+  /**
+   * byte identifier of an array
+   */
+  public static final byte ARRAY_ID = 42; // '*'
+
+  /**
+   * byte identifier of an error
+   */
+  public static final byte ERROR_ID = 45; // '-'
+
+  /**
+   * byte identifier of an integer
+   */
+  public static final byte INTEGER_ID = 58; // ':'
+
+  public static final byte OPEN_BRACE_ID = 0x28; // '('
+  public static final byte OPEN_BRACKET_ID = 0x5b; // '['
+  public static final byte HYPHEN_ID = 0x2d; // '-'
+  public static final byte PLUS_ID = 0x2b; // '+'
+  public static final byte NUMBER_1_BYTE = 0x31; // '1'
+  /**
+   * byte identifier of a simple string
+   */
+  public static final byte SIMPLE_STRING_ID = 43; // '+'
+  public static final String CRLF = "\r\n";
+  public static final byte[] CRLFar = stringToBytes(CRLF); // {13, 10} == {'\r', '\n'}
+
+  /**
+   * byte array of a nil response
+   */
+  public static final byte[] bNIL = stringToBytes("$-1\r\n"); // {'$', '-', '1', '\r', '\n'};
+
+  /**
+   * byte array of an empty string
+   */
+  public static final byte[] bEMPTY_ARRAY = stringToBytes("*0\r\n"); // {'*', '0', '\r', '\n'};
+
+  public static final byte[] err = stringToBytes("ERR ");
+  public static final byte[] noAuth = stringToBytes("NOAUTH ");
+  public static final byte[] wrongType = stringToBytes("WRONGTYPE ");
+
+  /**
+   * The charset being used by this coder, {@value #CHARSET}.
+   */
+  public static final String CHARSET = "UTF-8";
+
+  protected static final DecimalFormat decimalFormatter = new DecimalFormat("#");
+  static {
+    decimalFormatter.setMaximumFractionDigits(10);
+  }
+
+  /**
+   * Positive infinity string
+   */
+  public static final String P_INF = "+inf";
+
+  /**
+   * Negative infinity string
+   */
+  public static final String N_INF = "-inf";
+
+  public static final ByteBuf getBulkStringResponse(ByteBufAllocator alloc, byte[] value) {
+    ByteBuf response = alloc.buffer(value.length + 20);
+    response.writeByte(BULK_STRING_ID);
+    response.writeBytes(intToBytes(value.length));
+    response.writeBytes(CRLFar);
+    response.writeBytes(value);
+    response.writeBytes(CRLFar);
+    return response;
+  }
+
+  public static final ByteBuf getBulkStringResponse(ByteBufAllocator alloc, double value) {
+    ByteBuf response = alloc.buffer();
+    byte[] doub = doubleToBytes(value);
+    response.writeByte(BULK_STRING_ID);
+    response.writeBytes(intToBytes(doub.length));
+    response.writeBytes(CRLFar);
+    response.writeBytes(doub);
+    response.writeBytes(CRLFar);
+    return response;
+  }
+
+  public static final ByteBuf getBulkStringResponse(ByteBufAllocator alloc, String value) {
+    byte[] valueAr = stringToBytes(value);
+    int length = valueAr == null ? 0 : valueAr.length;
+    ByteBuf response = alloc.buffer(length + 20);
+    response.writeByte(BULK_STRING_ID);
+    response.writeBytes(intToBytes(length));
+    response.writeBytes(CRLFar);
+    response.writeBytes(valueAr);
+    response.writeBytes(CRLFar);
+    return response;
+  }
+
+  public static final ByteBuf getBulkStringArrayResponse(ByteBufAllocator alloc, List<String> items) {
+    Iterator<String> it = items.iterator();
+    ByteBuf response = alloc.buffer();
+    response.writeByte(ARRAY_ID);
+    response.writeBytes(intToBytes(items.size()));
+    response.writeBytes(CRLFar);
+    while(it.hasNext()) {
+      String next = it.next();
+      response.writeByte(BULK_STRING_ID);
+      response.writeBytes(intToBytes(next.length()));
+      response.writeBytes(CRLFar);
+      response.writeBytes(stringToBytes(next));
+      response.writeBytes(CRLFar);
+    }
+    return response;
+  }
+
+  public static final ByteBuf getBulkStringArrayResponse(ByteBufAllocator alloc, Collection<ByteArrayWrapper> items) {
+    Iterator<ByteArrayWrapper> it = items.iterator();
+    ByteBuf response = alloc.buffer();
+    response.writeByte(ARRAY_ID);
+    response.writeBytes(intToBytes(items.size()));
+    response.writeBytes(CRLFar);
+    while(it.hasNext()) {
+      ByteArrayWrapper nextWrapper = it.next();
+      if (nextWrapper != null) {
+        response.writeByte(BULK_STRING_ID);
+        response.writeBytes(intToBytes(nextWrapper.length()));
+        response.writeBytes(CRLFar);
+        response.writeBytes(nextWrapper.toBytes());
+        response.writeBytes(CRLFar);
+      } else
+        response.writeBytes(getNilResponse(alloc));
+    }
+
+    return response;
+  }
+
+  public static final ByteBuf getKeyValArrayResponse(ByteBufAllocator alloc, Collection<Entry<ByteArrayWrapper, ByteArrayWrapper>> items) {
+    Iterator<Map.Entry<ByteArrayWrapper,ByteArrayWrapper>> it = items.iterator();
+    ByteBuf response = alloc.buffer();
+    response.writeByte(ARRAY_ID);
+
+    int size = 0;
+    ByteBuf tmp = alloc.buffer();
+    while(it.hasNext()) {
+      Map.Entry<ByteArrayWrapper,ByteArrayWrapper> next = it.next();
+      byte[] key;
+      byte[] nextByteArray;
+      try {
+        key = next.getKey().toBytes();
+        nextByteArray = next.getValue().toBytes();
+      } catch (EntryDestroyedException e) {
+        continue;
+      }
+      tmp.writeByte(BULK_STRING_ID); // Add key
+      tmp.writeBytes(intToBytes(key.length));
+      tmp.writeBytes(CRLFar);
+      tmp.writeBytes(key);
+      tmp.writeBytes(CRLFar);
+      tmp.writeByte(BULK_STRING_ID); // Add value
+      tmp.writeBytes(intToBytes(nextByteArray.length));
+      tmp.writeBytes(CRLFar);
+      tmp.writeBytes(nextByteArray);
+      tmp.writeBytes(CRLFar);
+      size++;
+    }
+
+    response.writeBytes(intToBytes(size*2));
+    response.writeBytes(CRLFar);
+    response.writeBytes(tmp);
+
+    tmp.release();
+
+    return response;
+  }
+
+  public static final ByteBuf getScanResponse(ByteBufAllocator alloc, List<?> items) {
+    ByteBuf response = alloc.buffer();
+    response.writeByte(ARRAY_ID);
+    response.writeBytes(intToBytes(2));
+    response.writeBytes(CRLFar);
+    response.writeByte(BULK_STRING_ID);
+    byte[] cursor = stringToBytes((String) items.get(0));
+    response.writeBytes(intToBytes(cursor.length));
+    response.writeBytes(CRLFar);
+    response.writeBytes(cursor);
+    response.writeBytes(CRLFar);
+    items = items.subList(1, items.size());
+    Iterator<?> it = items.iterator();
+    response.writeByte(ARRAY_ID);
+    response.writeBytes(intToBytes(items.size()));
+    response.writeBytes(CRLFar);
+
+    while(it.hasNext()) {
+      Object nextObject = it.next();
+      if (nextObject instanceof String) {
+        String next = (String) nextObject;
+        response.writeByte(BULK_STRING_ID);
+        response.writeBytes(intToBytes(next.length()));
+        response.writeBytes(CRLFar);
+        response.writeBytes(stringToBytes(next));
+        response.writeBytes(CRLFar);
+      } else if (nextObject instanceof ByteArrayWrapper) {
+        byte[] next = ((ByteArrayWrapper) nextObject).toBytes();
+        response.writeByte(BULK_STRING_ID);
+        response.writeBytes(intToBytes(next.length));
+        response.writeBytes(CRLFar);
+        response.writeBytes(next);
+        response.writeBytes(CRLFar);
+      }
+    }
+    return response;
+  }
+
+  public static final ByteBuf getEmptyArrayResponse(ByteBufAllocator alloc) {
+    ByteBuf buf = alloc.buffer().writeBytes(bEMPTY_ARRAY);
+    return buf;
+  }
+
+  public static final ByteBuf getSimpleStringResponse(ByteBufAllocator alloc, String string) {
+    byte[] simpAr = stringToBytes(string);
+
+    ByteBuf response = alloc.buffer(simpAr.length + 20);
+    response.writeByte(SIMPLE_STRING_ID);
+    response.writeBytes(simpAr);
+    response.writeBytes(CRLFar);
+    return response;
+  }
+
+  public static final ByteBuf getErrorResponse(ByteBufAllocator alloc, String error) {
+    byte[] errorAr = stringToBytes(error);
+    ByteBuf response = alloc.buffer(errorAr.length + 25);
+    response.writeByte(ERROR_ID);
+    response.writeBytes(err);
+    response.writeBytes(errorAr);
+    response.writeBytes(CRLFar);
+    return response;
+  }
+
+  public static final ByteBuf getNoAuthResponse(ByteBufAllocator alloc, String error) {
+    byte[] errorAr = stringToBytes(error);
+    ByteBuf response = alloc.buffer(errorAr.length + 25);
+    response.writeByte(ERROR_ID);
+    response.writeBytes(noAuth);
+    response.writeBytes(errorAr);
+    response.writeBytes(CRLFar);
+    return response;
+  }
+
+  public static final ByteBuf getWrongTypeResponse(ByteBufAllocator alloc, String error) {
+    byte[] errorAr = stringToBytes(error);
+    ByteBuf response = alloc.buffer(errorAr.length + 31);
+    response.writeByte(ERROR_ID);
+    response.writeBytes(wrongType);
+    response.writeBytes(errorAr);
+    response.writeBytes(CRLFar);
+    return response;
+  }
+
+  public static final ByteBuf getIntegerResponse(ByteBufAllocator alloc, int integer) {
+    ByteBuf response = alloc.buffer(15);
+    response.writeByte(INTEGER_ID);
+    response.writeBytes(intToBytes(integer));
+    response.writeBytes(CRLFar);
+    return response;
+  }
+
+  public static final ByteBuf getIntegerResponse(ByteBufAllocator alloc, long l) {
+    ByteBuf response = alloc.buffer(25);
+    response.writeByte(INTEGER_ID);
+    response.writeBytes(longToBytes(l));
+    response.writeBytes(CRLFar);
+    return response;
+  }
+
+  public static final ByteBuf getNilResponse(ByteBufAllocator alloc) {
+    ByteBuf buf = alloc.buffer().writeBytes(bNIL);
+    return buf;
+  }
+
+  public static ByteBuf getBulkStringArrayResponseOfValues(ByteBufAllocator alloc, Collection<?> items) {
+    Iterator<?> it = items.iterator();
+    ByteBuf response = alloc.buffer();
+    response.writeByte(Coder.ARRAY_ID);
+    ByteBuf tmp = alloc.buffer();
+    int size = 0;
+    while(it.hasNext()) {
+      Object next = it.next();
+      ByteArrayWrapper nextWrapper = null;
+      if (next instanceof Entry) {
+        try {
+          nextWrapper = (ByteArrayWrapper) ((Entry<?, ?>) next).getValue();
+        } catch (EntryDestroyedException e) {
+          continue;
+        }
+      } else if (next instanceof Struct) {
+        nextWrapper = (ByteArrayWrapper) ((Struct) next).getFieldValues()[1];
+      }
+      if (nextWrapper != null) {
+        tmp.writeByte(Coder.BULK_STRING_ID);
+        tmp.writeBytes(intToBytes(nextWrapper.length()));
+        tmp.writeBytes(Coder.CRLFar);
+        tmp.writeBytes(nextWrapper.toBytes());
+        tmp.writeBytes(Coder.CRLFar);
+      } else {
+        tmp.writeBytes(Coder.bNIL);
+      }
+      size++;
+    }
+
+    response.writeBytes(intToBytes(size));
+    response.writeBytes(Coder.CRLFar);
+    response.writeBytes(tmp);
+
+    tmp.release();
+
+    return response;
+  }
+
+  public static ByteBuf zRangeResponse(ByteBufAllocator alloc, Collection<?> list, boolean withScores) {
+    if (list.isEmpty())
+      return Coder.getEmptyArrayResponse(alloc);
+
+    ByteBuf buffer = alloc.buffer();
+    buffer.writeByte(Coder.ARRAY_ID);
+    ByteBuf tmp = alloc.buffer();
+    int size = 0;
+
+    for(Object entry: list) {
+      ByteArrayWrapper key;
+      DoubleWrapper score;
+      if (entry instanceof Entry) {
+        try {
+          key = (ByteArrayWrapper) ((Entry<?, ?>) entry).getKey();
+          score = (DoubleWrapper) ((Entry<?, ?>) entry).getValue();
+        } catch (EntryDestroyedException e) {
+          continue;
+        }
+      } else {
+        Object[] fieldVals = ((Struct) entry).getFieldValues();
+        key = (ByteArrayWrapper) fieldVals[0];
+        score = (DoubleWrapper) fieldVals[1];
+      }
+      byte[] byteAr = key.toBytes();
+      tmp.writeByte(Coder.BULK_STRING_ID);
+      tmp.writeBytes(intToBytes(byteAr.length));
+      tmp.writeBytes(Coder.CRLFar);
+      tmp.writeBytes(byteAr);
+      tmp.writeBytes(Coder.CRLFar);
+      size++;
+      if (withScores) {
+        String scoreString = score.toString();
+        byte[] scoreAr = stringToBytes(scoreString);
+        tmp.writeByte(Coder.BULK_STRING_ID);
+        tmp.writeBytes(intToBytes(scoreString.length()));
+        tmp.writeBytes(Coder.CRLFar);
+        tmp.writeBytes(scoreAr);
+        tmp.writeBytes(Coder.CRLFar);
+        size++;
+      }
+    }
+
+    buffer.writeBytes(intToBytes(size));
+    buffer.writeBytes(Coder.CRLFar);
+    buffer.writeBytes(tmp);
+
+    tmp.release();
+
+    return buffer;
+  }
+
+  public static ByteBuf getArrayOfNils(ByteBufAllocator alloc, int length) {
+    ByteBuf response = alloc.buffer();
+    response.writeByte(Coder.ARRAY_ID);
+    response.writeBytes(intToBytes(length));
+    response.writeBytes(Coder.CRLFar);
+
+    for (int i = 0; i < length; i++)
+      response.writeBytes(bNIL);
+
+    return response;
+  }
+
+  public static String bytesToString(byte[] bytes) {
+    if (bytes == null)
+      return null;
+    try {
+      return new String(bytes, CHARSET).intern();
+    } catch (UnsupportedEncodingException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  public static String doubleToString(double d) {
+    if (d == Double.POSITIVE_INFINITY)
+      return "Infinity";
+    else if (d == Double.NEGATIVE_INFINITY)
+      return "-Infinity";
+    return String.valueOf(d);
+  }
+
+  public static byte[] stringToBytes(String string) {
+    if (string == null || string.equals(""))
+      return null;
+    try {
+      return string.getBytes(CHARSET);
+    } catch (UnsupportedEncodingException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  public static ByteArrayWrapper stringToByteArrayWrapper(String s) {
+    return new ByteArrayWrapper(stringToBytes(s));
+  }
+
+  /*
+   * These toByte methods convert to byte arrays of the
+   * string representation of the input, not literal to byte
+   */
+
+  public static byte[] intToBytes(int i) {
+    return stringToBytes(String.valueOf(i));
+  }
+
+  public static byte[] longToBytes(long l) {
+    return stringToBytes(String.valueOf(l));
+  }
+
+  public static byte[] doubleToBytes(double d) {
+    return stringToBytes(doubleToString(d));
+  }
+
+  public static int bytesToInt(byte[] bytes) {
+    return Integer.parseInt(bytesToString(bytes));
+  }
+
+  public static long bytesToLong(byte[] bytes) {
+    return Long.parseLong(bytesToString(bytes));
+  }
+
+  /**
+   * A conversion where the byte array actually represents a string,
+   * so it is converted as a string not as a literal double
+   * @param bytes Array holding double
+   * @return Parsed value
+   * @throws NumberFormatException if bytes to string does not yield a convertible double
+   */
+  public static Double bytesToDouble(byte[] bytes) {
+    return stringToDouble(bytesToString(bytes));
+  }
+
+  /**
+   * Redis specific manner to parse floats
+   * @param d String holding double
+   * @return Value of string
+   * @throws NumberFormatException if the double cannot be parsed
+   */
+  public static double stringToDouble(String d) {
+    if (d.equalsIgnoreCase(P_INF))
+      return Double.POSITIVE_INFINITY;
+    else if (d.equalsIgnoreCase(N_INF))
+      return Double.NEGATIVE_INFINITY;
+    else
+      return Double.parseDouble(d);
+  }
+
+  public static ByteArrayWrapper stringToByteWrapper(String s) {
+    return new ByteArrayWrapper(stringToBytes(s));
+  }
+
+}


Mime
View raw message