geode-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From u..@apache.org
Subject [022/100] [abbrv] [partial] incubator-geode git commit: Merge remote-tracking branch 'origin/develop' into feature/GEODE-917
Date Mon, 22 Feb 2016 21:43:10 GMT
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/5beaaedc/geode-core/src/main/java/com/gemstone/gemfire/internal/hll/IBuilder.java
----------------------------------------------------------------------
diff --cc geode-core/src/main/java/com/gemstone/gemfire/internal/hll/IBuilder.java
index 0000000,0000000..0e74fe6
new file mode 100755
--- /dev/null
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/hll/IBuilder.java
@@@ -1,0 -1,0 +1,24 @@@
++/*
++ * Copyright (C) 2011 Clearspring Technologies, Inc.
++ *
++ * Licensed under the Apache License, Version 2.0 (the "License");
++ * you may not use this file except in compliance with the License.
++ * You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++package com.gemstone.gemfire.internal.hll;
++
++
++public interface IBuilder<T> {
++
++    T build();
++
++    int sizeof();
++}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/5beaaedc/geode-core/src/main/java/com/gemstone/gemfire/internal/hll/ICardinality.java
----------------------------------------------------------------------
diff --cc geode-core/src/main/java/com/gemstone/gemfire/internal/hll/ICardinality.java
index 0000000,0000000..fd32154
new file mode 100755
--- /dev/null
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/hll/ICardinality.java
@@@ -1,0 -1,0 +1,72 @@@
++/*
++ * Copyright (C) 2011 Clearspring Technologies, Inc.
++ *
++ * Licensed under the Apache License, Version 2.0 (the "License");
++ * you may not use this file except in compliance with the License.
++ * You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++package com.gemstone.gemfire.internal.hll;
++
++
++import java.io.IOException;
++
++
++public interface ICardinality {
++
++    /**
++     * @param o stream element
++     * @return false if the value returned by cardinality() is unaffected by the appearance
of o in the stream.
++     */
++    boolean offer(Object o);
++
++    /**
++     * Offer the value as a hashed long value
++     *
++     * @param hashedLong - the hash of the item to offer to the estimator
++     * @return false if the value returned by cardinality() is unaffected by the appearance
of hashedLong in the stream
++     */
++    boolean offerHashed(long hashedLong);
++
++    /**
++     * Offer the value as a hashed long value
++     *
++     * @param hashedInt - the hash of the item to offer to the estimator
++     * @return false if the value returned by cardinality() is unaffected by the appearance
of hashedInt in the stream
++     */
++    boolean offerHashed(int hashedInt);
++
++    /**
++     * @return the number of unique elements in the stream or an estimate thereof
++     */
++    long cardinality();
++
++    /**
++     * @return size in bytes needed for serialization
++     */
++    int sizeof();
++
++    /**
++     * @return byte[]
++     * @throws IOException
++     */
++    byte[] getBytes() throws IOException;
++
++    /**
++     * Merges estimators to produce a new estimator for the combined streams
++     * of this estimator and those passed as arguments.
++     * <p/>
++     * Nor this estimator nor the one passed as parameters are modified.
++     *
++     * @param estimators Zero or more compatible estimators
++     * @throws CardinalityMergeException If at least one of the estimators is not compatible
with this one
++     */
++    ICardinality merge(ICardinality... estimators) throws CardinalityMergeException;
++}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/5beaaedc/geode-core/src/main/java/com/gemstone/gemfire/internal/hll/MurmurHash.java
----------------------------------------------------------------------
diff --cc geode-core/src/main/java/com/gemstone/gemfire/internal/hll/MurmurHash.java
index 0000000,0000000..139e029
new file mode 100755
--- /dev/null
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/hll/MurmurHash.java
@@@ -1,0 -1,0 +1,245 @@@
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one or more
++ * contributor license agreements.  See the NOTICE file distributed with
++ * this work for additional information regarding copyright ownership.
++ * The ASF licenses this file to You under the Apache License, Version 2.0
++ * (the "License"); you may not use this file except in compliance with
++ * the License.  You may obtain a copy of the License at
++ *
++ *      http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++package com.gemstone.gemfire.internal.hll;
++
++
++/**
++ * This is a very fast, non-cryptographic hash suitable for general hash-based
++ * lookup. See http://murmurhash.googlepages.com/ for more details.
++ * <p/>
++ * <p>
++ * The C version of MurmurHash 2.0 found at that site was ported to Java by
++ * Andrzej Bialecki (ab at getopt org).
++ * </p>
++ */
++public class MurmurHash
++{
++    public static int hash(Object o)
++    {
++        if (o == null)
++        {
++            return 0;
++        }
++        if (o instanceof Long)
++        {
++            return hashLong((Long) o);
++        }
++        if (o instanceof Integer)
++        {
++            return hashLong((Integer) o);
++        }
++        if (o instanceof Double)
++        {
++            return hashLong(Double.doubleToRawLongBits((Double) o));
++        }
++        if (o instanceof Float)
++        {
++            return hashLong(Float.floatToRawIntBits((Float) o));
++        }
++        if (o instanceof String)
++        {
++            return hash(((String) o).getBytes());
++        }
++        if (o instanceof byte[])
++        {
++            return hash((byte[]) o);
++        }
++        return hash(o.toString());
++    }
++
++    public static int hash(byte[] data)
++    {
++        return hash(data, 0, data.length, -1);
++    }
++
++    public static int hash(byte[] data, int seed)
++    {
++        return hash(data, 0, data.length, seed);
++    }
++
++    public static int hash(byte[] data, int offset, int length, int seed)
++    {
++        int m = 0x5bd1e995;
++        int r = 24;
++
++        int h = seed ^ length;
++
++        int len_4 = length >> 2;
++
++        for (int i = 0; i < len_4; i++)
++        {
++            int i_4 = i << 2;
++            int k = data[offset + i_4 + 3];
++            k = k << 8;
++            k = k | (data[offset + i_4 + 2] & 0xff);
++            k = k << 8;
++            k = k | (data[offset + i_4 + 1] & 0xff);
++            k = k << 8;
++            k = k | (data[offset + i_4 + 0] & 0xff);
++            k *= m;
++            k ^= k >>> r;
++            k *= m;
++            h *= m;
++            h ^= k;
++        }
++
++        // avoid calculating modulo
++        int len_m = len_4 << 2;
++        int left = length - len_m;
++
++        if (left != 0)
++        {
++            if (left >= 3)
++            {
++                h ^= (int) data[offset + length - 3] << 16;
++            }
++            if (left >= 2)
++            {
++                h ^= (int) data[offset + length - 2] << 8;
++            }
++            if (left >= 1)
++            {
++                h ^= (int) data[offset + length - 1];
++            }
++
++            h *= m;
++        }
++
++        h ^= h >>> 13;
++        h *= m;
++        h ^= h >>> 15;
++
++        return h;
++    }
++
++    public static int hashLong(long data)
++    {
++        int m = 0x5bd1e995;
++        int r = 24;
++
++        int h = 0;
++
++        int k = (int) data * m;
++        k ^= k >>> r;
++        h ^= k * m;
++
++        k = (int) (data >> 32) * m;
++        k ^= k >>> r;
++        h *= m;
++        h ^= k * m;
++
++        h ^= h >>> 13;
++        h *= m;
++        h ^= h >>> 15;
++
++        return h;
++    }
++
++    public static long hash64(Object o)
++    {
++        if (o == null)
++        {
++            return 0l;
++        }
++        else if (o instanceof String)
++        {
++            final byte[] bytes = ((String) o).getBytes();
++            return hash64(bytes, bytes.length);
++        }
++        else if (o instanceof byte[])
++        {
++            final byte[] bytes = (byte[]) o;
++            return hash64(bytes, bytes.length);
++        }
++        return hash64(o.toString());
++    }
++
++    // 64 bit implementation copied from here:  https://github.com/tnm/murmurhash-java
++
++    /**
++     * Generates 64 bit hash from byte array with default seed value.
++     *
++     * @param data   byte array to hash
++     * @param length length of the array to hash
++     * @return 64 bit hash of the given string
++     */
++    public static long hash64(final byte[] data, int length)
++    {
++        return hash64(data, length, 0xe17a1465);
++    }
++
++
++    /**
++     * Generates 64 bit hash from byte array of the given length and seed.
++     *
++     * @param data   byte array to hash
++     * @param length length of the array to hash
++     * @param seed   initial seed value
++     * @return 64 bit hash of the given array
++     */
++    public static long hash64(final byte[] data, int length, int seed)
++    {
++        final long m = 0xc6a4a7935bd1e995L;
++        final int r = 47;
++
++        long h = (seed & 0xffffffffl) ^ (length * m);
++
++        int length8 = length / 8;
++
++        for (int i = 0; i < length8; i++)
++        {
++            final int i8 = i * 8;
++            long k = ((long) data[i8 + 0] & 0xff) + (((long) data[i8 + 1] & 0xff)
<< 8)
++                    + (((long) data[i8 + 2] & 0xff) << 16) + (((long) data[i8
+ 3] & 0xff) << 24)
++                    + (((long) data[i8 + 4] & 0xff) << 32) + (((long) data[i8
+ 5] & 0xff) << 40)
++                    + (((long) data[i8 + 6] & 0xff) << 48) + (((long) data[i8
+ 7] & 0xff) << 56);
++
++            k *= m;
++            k ^= k >>> r;
++            k *= m;
++
++            h ^= k;
++            h *= m;
++        }
++
++        switch (length % 8)
++        {
++            case 7:
++                h ^= (long) (data[(length & ~7) + 6] & 0xff) << 48;
++            case 6:
++                h ^= (long) (data[(length & ~7) + 5] & 0xff) << 40;
++            case 5:
++                h ^= (long) (data[(length & ~7) + 4] & 0xff) << 32;
++            case 4:
++                h ^= (long) (data[(length & ~7) + 3] & 0xff) << 24;
++            case 3:
++                h ^= (long) (data[(length & ~7) + 2] & 0xff) << 16;
++            case 2:
++                h ^= (long) (data[(length & ~7) + 1] & 0xff) << 8;
++            case 1:
++                h ^= (long) (data[length & ~7] & 0xff);
++                h *= m;
++        }
++        ;
++
++        h ^= h >>> r;
++        h *= m;
++        h ^= h >>> r;
++
++        return h;
++    }
++}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/5beaaedc/geode-core/src/main/java/com/gemstone/gemfire/internal/hll/RegisterSet.java
----------------------------------------------------------------------
diff --cc geode-core/src/main/java/com/gemstone/gemfire/internal/hll/RegisterSet.java
index 0000000,0000000..e35340f
new file mode 100755
--- /dev/null
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/hll/RegisterSet.java
@@@ -1,0 -1,0 +1,110 @@@
++/*
++ * Copyright (C) 2012 Clearspring Technologies, Inc.
++ *
++ * Licensed under the Apache License, Version 2.0 (the "License");
++ * you may not use this file except in compliance with the License.
++ * You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++package com.gemstone.gemfire.internal.hll;
++
++
++public class RegisterSet {
++
++    public final static int LOG2_BITS_PER_WORD = 6;
++    public final static int REGISTER_SIZE = 5;
++
++    public final int count;
++    public final int size;
++
++    private final int[] M;
++
++    public RegisterSet(int count) {
++        this(count, null);
++    }
++
++    public RegisterSet(int count, int[] initialValues) {
++        this.count = count;
++
++        if (initialValues == null) {
++            this.M = new int[getSizeForCount(count)];
++        } else {
++            this.M = initialValues;
++        }
++        this.size = this.M.length;
++    }
++
++    public static int getBits(int count) {
++        return count / LOG2_BITS_PER_WORD;
++    }
++
++    public static int getSizeForCount(int count) {
++        int bits = getBits(count);
++        if (bits == 0) {
++            return 1;
++        } else if (bits % Integer.SIZE == 0) {
++            return bits;
++        } else {
++            return bits + 1;
++        }
++    }
++
++    public void set(int position, int value) {
++        int bucketPos = position / LOG2_BITS_PER_WORD;
++        int shift = REGISTER_SIZE * (position - (bucketPos * LOG2_BITS_PER_WORD));
++        this.M[bucketPos] = (this.M[bucketPos] & ~(0x1f << shift)) | (value <<
shift);
++    }
++
++    public int get(int position) {
++        int bucketPos = position / LOG2_BITS_PER_WORD;
++        int shift = REGISTER_SIZE * (position - (bucketPos * LOG2_BITS_PER_WORD));
++        return (this.M[bucketPos] & (0x1f << shift)) >>> shift;
++    }
++
++    public boolean updateIfGreater(int position, int value) {
++        int bucket = position / LOG2_BITS_PER_WORD;
++        int shift = REGISTER_SIZE * (position - (bucket * LOG2_BITS_PER_WORD));
++        int mask = 0x1f << shift;
++
++        // Use long to avoid sign issues with the left-most shift
++        long curVal = this.M[bucket] & mask;
++        long newVal = value << shift;
++        if (curVal < newVal) {
++            this.M[bucket] = (int) ((this.M[bucket] & ~mask) | newVal);
++            return true;
++        } else {
++            return false;
++        }
++    }
++
++    public void merge(RegisterSet that) {
++        for (int bucket = 0; bucket < M.length; bucket++) {
++            int word = 0;
++            for (int j = 0; j < LOG2_BITS_PER_WORD; j++) {
++                int mask = 0x1f << (REGISTER_SIZE * j);
++
++                int thisVal = (this.M[bucket] & mask);
++                int thatVal = (that.M[bucket] & mask);
++                word |= (thisVal < thatVal) ? thatVal : thisVal;
++            }
++            this.M[bucket] = word;
++        }
++    }
++
++    int[] readOnlyBits() {
++        return M;
++    }
++
++    public int[] bits() {
++        int[] copy = new int[size];
++        System.arraycopy(M, 0, copy, 0, M.length);
++        return copy;
++    }
++}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/5beaaedc/geode-core/src/main/java/com/gemstone/gemfire/internal/redis/RegionProvider.java
----------------------------------------------------------------------
diff --cc geode-core/src/main/java/com/gemstone/gemfire/internal/redis/RegionProvider.java
index f9539e5,0000000..dcd83cb
mode 100644,000000..100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/redis/RegionProvider.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/redis/RegionProvider.java
@@@ -1,553 -1,0 +1,553 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *      http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package com.gemstone.gemfire.internal.redis;
 +
 +import java.io.Closeable;
 +import java.util.HashMap;
 +import java.util.Map;
 +import java.util.Map.Entry;
 +import java.util.Set;
 +import java.util.concurrent.ConcurrentHashMap;
 +import java.util.concurrent.ConcurrentMap;
 +import java.util.concurrent.ScheduledExecutorService;
 +import java.util.concurrent.ScheduledFuture;
 +import java.util.concurrent.TimeUnit;
 +import java.util.concurrent.locks.Lock;
 +import java.util.concurrent.locks.ReentrantLock;
 +
 +import com.gemstone.gemfire.cache.Cache;
 +import com.gemstone.gemfire.cache.CacheTransactionManager;
 +import com.gemstone.gemfire.cache.Region;
 +import com.gemstone.gemfire.cache.RegionShortcut;
 +import com.gemstone.gemfire.cache.TransactionId;
 +import com.gemstone.gemfire.cache.query.IndexExistsException;
 +import com.gemstone.gemfire.cache.query.IndexInvalidException;
 +import com.gemstone.gemfire.cache.query.IndexNameConflictException;
 +import com.gemstone.gemfire.cache.query.Query;
 +import com.gemstone.gemfire.cache.query.QueryInvalidException;
 +import com.gemstone.gemfire.cache.query.QueryService;
 +import com.gemstone.gemfire.cache.query.RegionNotFoundException;
 +import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
 +import com.gemstone.gemfire.internal.redis.executor.ExpirationExecutor;
 +import com.gemstone.gemfire.internal.redis.executor.ListQuery;
 +import com.gemstone.gemfire.internal.redis.executor.SortedSetQuery;
- import com.gemstone.gemfire.internal.redis.executor.hll.HyperLogLogPlus;
++import com.gemstone.gemfire.internal.hll.HyperLogLogPlus;
 +import com.gemstone.gemfire.management.cli.Result;
 +import com.gemstone.gemfire.management.cli.Result.Status;
 +import com.gemstone.gemfire.management.internal.cli.commands.CreateAlterDestroyRegionCommands;
 +import com.gemstone.gemfire.redis.GemFireRedisServer;
 +
 +/**
 + * This class stands between {@link Executor} and {@link Cache#getRegion(String)}.
 + * This is needed because some keys for Redis represented as a {@link Region} in
 + * {@link GemFireRedisServer} come with additional state. Therefore getting, creating,
 + * or destroying a {@link Region} needs to be synchronized, which is done away with
 + * and abstracted by this class.
 + * 
 + * @author Vitaly Gavrilov
 + *
 + */
 +public class RegionProvider implements Closeable {
 +
 +  private final ConcurrentHashMap<ByteArrayWrapper, Region<?, ?>> regions;
 +
 +  /**
 +   * This is the Redis meta data {@link Region} that holds the {@link RedisDataType}
 +   * information for all Regions created. The mapping is a {@link String} key which is the
name
 +   * of the {@link Region} created to hold the data to the RedisDataType it contains.
 +   */
 +  private final Region<String, RedisDataType> redisMetaRegion;
 +
 +  /**
 +   * This is the {@link RedisDataType#REDIS_STRING} {@link Region}. This is the Region
 +   * that stores all string contents
 +   */
 +  private final Region<ByteArrayWrapper, ByteArrayWrapper> stringsRegion;
 +
 +  /**
 +   * This is the {@link RedisDataType#REDIS_HLL} {@link Region}. This is the Region
 +   * that stores all HyperLogLog contents
 +   */
 +  private final Region<ByteArrayWrapper, HyperLogLogPlus> hLLRegion;
 +
 +  private final Cache cache;
 +  private final QueryService queryService;
 +  private final ConcurrentMap<ByteArrayWrapper, Map<Enum<?>, Query>> preparedQueries
= new ConcurrentHashMap<ByteArrayWrapper, Map<Enum<?>, Query>>();
 +  private final ConcurrentMap<ByteArrayWrapper, ScheduledFuture<?>> expirationsMap;
 +  private final ScheduledExecutorService expirationExecutor;
 +  private final RegionShortcut defaultRegionType;
 +  private static final CreateAlterDestroyRegionCommands cliCmds = new CreateAlterDestroyRegionCommands();
 +  private final ConcurrentHashMap<String, Lock> locks;
 +
 +  public RegionProvider(Region<ByteArrayWrapper, ByteArrayWrapper> stringsRegion,
Region<ByteArrayWrapper, HyperLogLogPlus> hLLRegion, Region<String, RedisDataType>
redisMetaRegion, ConcurrentMap<ByteArrayWrapper, ScheduledFuture<?>> expirationsMap,
ScheduledExecutorService expirationExecutor, RegionShortcut defaultShortcut) {
 +    if (stringsRegion == null || hLLRegion == null || redisMetaRegion == null)
 +      throw new NullPointerException();
 +    this.regions = new ConcurrentHashMap<ByteArrayWrapper, Region<?, ?>>();
 +    this.stringsRegion = stringsRegion;
 +    this.hLLRegion = hLLRegion;
 +    this.redisMetaRegion = redisMetaRegion;
 +    this.cache = GemFireCacheImpl.getInstance();
 +    this.queryService = cache.getQueryService();
 +    this.expirationsMap = expirationsMap;
 +    this.expirationExecutor = expirationExecutor;
 +    this.defaultRegionType = defaultShortcut;
 +    this.locks = new ConcurrentHashMap<String, Lock>();
 +  }
 +
 +  public boolean existsKey(ByteArrayWrapper key) {
 +    return this.redisMetaRegion.containsKey(key.toString());
 +  }
 +
 +  public Set<String> metaKeySet() {
 +    return this.redisMetaRegion.keySet();
 +  }
 +
 +  public Set<Map.Entry<String, RedisDataType>> metaEntrySet() {
 +    return this.redisMetaRegion.entrySet();
 +  }
 +
 +  public int getMetaSize() {
 +    return this.redisMetaRegion.size() - RedisConstants.NUM_DEFAULT_KEYS;
 +  }
 +
 +  private boolean metaRemoveEntry(ByteArrayWrapper key) {
 +    return this.redisMetaRegion.remove(key.toString()) != null;
 +  }
 +
 +  public RedisDataType metaPutIfAbsent(ByteArrayWrapper key, RedisDataType value) {
 +    return this.redisMetaRegion.putIfAbsent(key.toString(), value);
 +  }
 +
 +  public RedisDataType metaPut(ByteArrayWrapper key, RedisDataType value) {
 +    return this.redisMetaRegion.put(key.toString(), value);
 +  }
 +
 +  public RedisDataType metaGet(ByteArrayWrapper key) {
 +    return this.redisMetaRegion.get(key.toString());
 +  }
 +
 +  public Region<?, ?> getRegion(ByteArrayWrapper key) {
 +    return this.regions.get(key);
 +  }
 +
 +  public void removeRegionReferenceLocally(ByteArrayWrapper key, RedisDataType type) {
 +    Lock lock = this.locks.get(key.toString());
 +    boolean locked = false;
 +    try {
 +      locked = lock.tryLock();
 +      // If we cannot get the lock we ignore this remote event, this key has local event
 +      // that started independently, ignore this event to prevent deadlock
 +      if (locked) {
 +        cancelKeyExpiration(key);
 +        removeRegionState(key, type);
 +      }
 +    } finally {
 +      if (locked) {
 +        lock.unlock();
 +      }
 +    }
 +  }
 +
 +  public boolean removeKey(ByteArrayWrapper key) {
 +    RedisDataType type = getRedisDataType(key);
 +    return removeKey(key, type);
 +  }
 +
 +  public boolean removeKey(ByteArrayWrapper key, RedisDataType type) {
 +    return removeKey(key, type, true);
 +  }
 +
 +  public boolean removeKey(ByteArrayWrapper key, RedisDataType type, boolean cancelExpiration)
{
 +    if (type == null || type == RedisDataType.REDIS_PROTECTED)
 +      return false;
 +    Lock lock = this.locks.get(key.toString());
 +    try {
 +      if (lock != null)  {// Strings/hlls will not have locks
 +        lock.lock();
 +      }
 +      metaRemoveEntry(key);
 +      try {
 +        if (type == RedisDataType.REDIS_STRING) {
 +          return this.stringsRegion.remove(key) != null;
 +        } else if (type == RedisDataType.REDIS_HLL) {
 +          return this.hLLRegion.remove(key) != null;
 +        } else {
 +          return destroyRegion(key, type);
 +        }
 +      } catch (Exception exc) {
 +        return false;
 +      } finally {
 +        if (cancelExpiration)
 +          cancelKeyExpiration(key);
 +        else
 +          removeKeyExpiration(key);
 +        if (lock != null)
 +          this.locks.remove(key.toString());
 +      }
 +    } finally {
 +      if (lock != null) {
 +        lock.unlock();
 +      }
 +    }
 +  }
 +
 +  public Region<?, ?> getOrCreateRegion(ByteArrayWrapper key, RedisDataType type,
ExecutionHandlerContext context) {
 +    return getOrCreateRegion0(key, type, context, true);
 +  }
 +
 +  public void createRemoteRegionReferenceLocally(ByteArrayWrapper key, RedisDataType type)
{
 +    if (type == null || type == RedisDataType.REDIS_STRING || type == RedisDataType.REDIS_HLL)
 +      return;
 +    Region<?, ?> r = this.regions.get(key);
 +    if (r != null)
 +      return;
 +    if (!this.regions.contains(key)) {
 +      String stringKey = key.toString();
 +      Lock lock = this.locks.get(stringKey);
 +      if (lock == null) {
 +        this.locks.putIfAbsent(stringKey, new ReentrantLock());
 +        lock = this.locks.get(stringKey);
 +      }
 +      boolean locked = false;
 +      try {
 +        locked = lock.tryLock();
 +        // If we cannot get the lock then this remote event may have been initialized
 +        // independently on this machine, so if we wait on the lock it is more than
 +        // likely we will deadlock just to do the same task. This event can be ignored
 +        if (locked) {
 +          r = cache.getRegion(key.toString());
 +          // If r is null, this implies that we are after a create/destroy
 +          // simply ignore. Calls to getRegion or getOrCreate will work correctly
 +          if (r == null)
 +            return;
 +
 +          if (type == RedisDataType.REDIS_LIST) {
 +            doInitializeList(key, r);
 +          } else if (type == RedisDataType.REDIS_SORTEDSET) {
 +            try {
 +              doInitializeSortedSet(key, r);
 +            } catch (RegionNotFoundException | IndexInvalidException e) {
 +              //ignore
 +            }
 +          }
 +          this.regions.put(key, r);
 +        }
 +      } finally {
 +        if (locked) {
 +          lock.unlock();
 +        }
 +      }
 +    }
 +  }
 +
 +  private Region<?, ?> getOrCreateRegion0(ByteArrayWrapper key, RedisDataType type,
ExecutionHandlerContext context, boolean addToMeta) {
 +    checkDataType(key, type);
 +    Region<?, ?> r = this.regions.get(key);
 +    if (r != null && r.isDestroyed()) {
 +      removeKey(key, type);
 +      r = null;
 +    }
 +    if (r == null) {
 +      String stringKey = key.toString();
 +      Lock lock = this.locks.get(stringKey);
 +      if (lock == null) {
 +        this.locks.putIfAbsent(stringKey, new ReentrantLock());
 +        lock = this.locks.get(stringKey);
 +      }
 +
 +      try {
 +        lock.lock();
 +        r = regions.get(key);
 +        if (r == null) {
 +          boolean hasTransaction = context != null && context.hasTransaction();
// Can create without context
 +          CacheTransactionManager txm = null;
 +          TransactionId transactionId = null;
 +          try {
 +            if (hasTransaction) {
 +              txm = cache.getCacheTransactionManager();
 +              transactionId = txm.suspend();
 +            }
 +            Exception concurrentCreateDestroyException = null;
 +            do {
 +              concurrentCreateDestroyException = null;
 +              r = createRegionGlobally(stringKey);
 +              try {
 +                if (type == RedisDataType.REDIS_LIST) {
 +                  doInitializeList(key, r);
 +                } else if (type == RedisDataType.REDIS_SORTEDSET) {
 +                  try {
 +                    doInitializeSortedSet(key, r);
 +                  } catch (RegionNotFoundException | IndexInvalidException e) {
 +                    concurrentCreateDestroyException = e;
 +                  }
 +                }
 +              } catch (QueryInvalidException e) {
 +                if (e.getCause() instanceof RegionNotFoundException) {
 +                  concurrentCreateDestroyException = e;
 +                }
 +              }
 +            } while(concurrentCreateDestroyException != null);
 +            this.regions.put(key, r);            
 +            if (addToMeta) {
 +              RedisDataType existingType = metaPutIfAbsent(key, type);
 +              if (existingType != null && existingType != type)
 +                throw new RedisDataTypeMismatchException("The key name \"" + key + "\" is
already used by a " + existingType.toString());
 +            }
 +          } finally {
 +            if (hasTransaction)
 +              txm.resume(transactionId);
 +          }
 +        }
 +      } finally {
 +        lock.unlock();
 +      }
 +    }
 +    return r;
 +  }
 +
 +  /**
 +   * SYNCHRONIZE EXTERNALLY OF this.locks.get(key.toString())!!!!!
 +   * 
 +   * @param key Key of region to destroy
 +   * @param type Type of region to destroyu
 +   * @return Flag if destroyed
 +   */
 +  private boolean destroyRegion(ByteArrayWrapper key, RedisDataType type) {
 +    Region<?, ?> r = this.regions.get(key);
 +    if (r != null) {
 +      try {
 +        r.destroyRegion();
 +      } catch (Exception e) {
 +        return false;
 +      } finally {
 +        removeRegionState(key, type);
 +      }
 +    }
 +    return true;
 +  }
 +
 +  /**
 +   * Do not call this method if you are not synchronized on the lock associated with this
key
 +   * 
 +   * @param key Key of region to remove
 +   * @param type Type of key to remove all state
 +   */
 +  private void removeRegionState(ByteArrayWrapper key, RedisDataType type) {
 +    this.preparedQueries.remove(key);
 +    this.regions.remove(key);
 +  }
 +
 +  private void doInitializeSortedSet(ByteArrayWrapper key, Region<?, ?> r) throws
RegionNotFoundException, IndexInvalidException {
 +    String fullpath = r.getFullPath();
 +    try {
 +      queryService.createIndex("scoreIndex", "entry.value.score", r.getFullPath() + ".entrySet
entry");
 +      queryService.createIndex("scoreIndex2", "value.score", r.getFullPath() + ".values
value");
 +    } catch (IndexNameConflictException | IndexExistsException | UnsupportedOperationException
e) {
 +      // ignore, these indexes already exist or unsupported but make sure prepared queries
are made
 +    }
 +    HashMap<Enum<?>, Query> queryList = new HashMap<Enum<?>, Query>();
 +    for (SortedSetQuery lq: SortedSetQuery.values()) {
 +      String queryString = lq.getQueryString(fullpath);
 +      Query query = this.queryService.newQuery(queryString);
 +      queryList.put(lq, query);
 +    }
 +    this.preparedQueries.put(key, queryList);
 +  }
 +
 +  private void doInitializeList(ByteArrayWrapper key, Region r) {
 +    r.put("head", Integer.valueOf(0));
 +    r.put("tail", Integer.valueOf(0));
 +    String fullpath = r.getFullPath();
 +    HashMap<Enum<?>, Query> queryList = new HashMap<Enum<?>, Query>();
 +    for (ListQuery lq: ListQuery.values()) {
 +      String queryString = lq.getQueryString(fullpath);
 +      Query query = this.queryService.newQuery(queryString);
 +      queryList.put(lq, query);
 +    }
 +    this.preparedQueries.put(key, queryList);
 +  }
 +
 +  /**
 +   * This method creates a Region globally with the given name. If
 +   * there is an error in the creation, a runtime exception will
 +   * be thrown.
 +   * 
 +   * @param key Name of Region to create
 +   * @return Region Region created globally
 +   */
 +  private Region<?, ?> createRegionGlobally(String key) {
 +    Region<?, ?> r = null;
 +    r = cache.getRegion(key);
 +    if (r != null) return r;
 +    do {
 +      Result result = cliCmds.createRegion(key, defaultRegionType, null, null, true, null,
null, null, null, null, null, null, null, null, null, null, null, null, null, null, null,
null, null, null, null, null, null, null, null, null, null, null, null, null, null, null,
null, null);
 +      r = cache.getRegion(key);
 +      if (result.getStatus() == Status.ERROR && r == null) {
 +        String err = "";
 +        while(result.hasNextLine())
 +          err += result.nextLine();
 +        throw new RegionCreationException(err);
 +      }
 +    } while(r == null); // The region can be null in the case that it is concurrently destroyed
by
 +    // a remote even triggered internally by Geode
 +    return r;
 +  }
 +
 +  public Query getQuery(ByteArrayWrapper key, Enum<?> query) {
 +    return this.preparedQueries.get(key).get(query);
 +    /*
 +    if (query instanceof ListQuery) {
 +      return this.queryService.newQuery(((ListQuery)query).getQueryString(this.regions.get(key).getFullPath()));
 +    } else {
 +      return this.queryService.newQuery(((SortedSetQuery)query).getQueryString(this.regions.get(key).getFullPath()));
 +    }
 +     */
 +  }
 +
 +  /**
 +   * Checks if the given key is associated with the passed data type.
 +   * If there is a mismatch, a {@link RuntimeException} is thrown
 +   * 
 +   * @param key Key to check
 +   * @param type Type to check to
 +   */
 +  protected void checkDataType(ByteArrayWrapper key, RedisDataType type) {
 +    RedisDataType currentType = redisMetaRegion.get(key.toString());
 +    if (currentType == null)
 +      return;
 +    if (currentType == RedisDataType.REDIS_PROTECTED)
 +      throw new RedisDataTypeMismatchException("The key name \"" + key + "\" is protected");
 +    if (currentType != type)
 +      throw new RedisDataTypeMismatchException("The key name \"" + key + "\" is already
used by a " + currentType.toString());
 +  }
 +
 +  public boolean regionExists(ByteArrayWrapper key) {
 +    return this.regions.containsKey(key);
 +  }
 +
 +  public Region<ByteArrayWrapper, ByteArrayWrapper> getStringsRegion() {
 +    return this.stringsRegion;
 +  }
 +
 +  public Region<ByteArrayWrapper, HyperLogLogPlus> gethLLRegion() {
 +    return this.hLLRegion;
 +  }
 +
 +  private RedisDataType getRedisDataType(String key) {
 +    return this.redisMetaRegion.get(key);
 +  }
 +
 +  public RedisDataType getRedisDataType(ByteArrayWrapper key) {
 +    return getRedisDataType(key.toString());
 +  }
 +
 +  /**
 +   * Sets the expiration for a key. The setting and modifying of a key expiration can only
be set by a delay,
 +   * which means that both expiring after a time and at a time can be done but
 +   * the delay to expire at a time must be calculated before these calls. It is
 +   * also important to note that the delay is always handled in milliseconds
 +   * 
 +   * @param key The key to set the expiration for
 +   * @param delay The delay in milliseconds of the expiration
 +   * @return True is expiration set, false otherwise
 +   */
 +  public final boolean setExpiration(ByteArrayWrapper key, long delay) {
 +    RedisDataType type = getRedisDataType(key);
 +    if (type == null)
 +      return false;
 +    ScheduledFuture<?> future = this.expirationExecutor.schedule(new ExpirationExecutor(key,
type, this), delay, TimeUnit.MILLISECONDS);
 +    this.expirationsMap.put(key, future);
 +    return true;
 +  }
 +
 +  /**
 +   * Modifies an expiration on a key
 +   * 
 +   * @param key String key to modify expiration on
 +   * @param delay Delay in milliseconds to reset the expiration to
 +   * @return True if reset, false if not
 +   */
 +  public final boolean modifyExpiration(ByteArrayWrapper key, long delay) {
 +    /*
 +     * Attempt to cancel future task
 +     */
 +    boolean canceled = cancelKeyExpiration(key);
 +
 +    if (!canceled)
 +      return false;
 +
 +    RedisDataType type = getRedisDataType(key);
 +    if (type == null)
 +      return false;
 +
 +    ScheduledFuture<?> future = this.expirationExecutor.schedule(new ExpirationExecutor(key,
type, this), delay, TimeUnit.MILLISECONDS);
 +    this.expirationsMap.put(key, future);
 +    return true;
 +  }
 +
 +  /**
 +   * Removes an expiration from a key
 +   * 
 +   * @param key Key
 +   * @return True is expiration cancelled on the key, false otherwise
 +   */
 +  public final boolean cancelKeyExpiration(ByteArrayWrapper key) {
 +    ScheduledFuture<?> future = expirationsMap.remove(key);
 +    if (future == null)
 +      return false;
 +    return future.cancel(false);
 +  }
 +
 +  private boolean removeKeyExpiration(ByteArrayWrapper key) {
 +    return expirationsMap.remove(key) != null;
 +  }
 +
 +  /**
 +   * Check method if key has expiration
 +   * 
 +   * @param key Key
 +   * @return True if key has expiration, false otherwise
 +   */
 +  public boolean hasExpiration(ByteArrayWrapper key) {
 +    return this.expirationsMap.containsKey(key);
 +  }
 +
 +  /**
 +   * Get remaining expiration time
 +   * 
 +   * @param key Key
 +   * @return Remaining time in milliseconds or 0 if no delay or key doesn't exist
 +   */
 +  public final long getExpirationDelayMillis(ByteArrayWrapper key) {
 +    ScheduledFuture<?> future = this.expirationsMap.get(key);
 +    return future != null ? future.getDelay(TimeUnit.MILLISECONDS) : 0L;
 +  }
 +
 +  @Override
 +  public void close() {
 +    this.preparedQueries.clear();
 +  }
 +
 +  public String dumpRegionsCache() {
 +    StringBuilder builder = new StringBuilder();
 +    for (Entry<ByteArrayWrapper, Region<?, ?>> e : this.regions.entrySet())
{
 +      builder.append(e.getKey() + " --> {" + e.getValue() + "}\n");
 +    }
 +    return builder.toString();
 +  }
 +
 +}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/5beaaedc/geode-core/src/main/java/com/gemstone/gemfire/internal/redis/executor/hll/PFAddExecutor.java
----------------------------------------------------------------------
diff --cc geode-core/src/main/java/com/gemstone/gemfire/internal/redis/executor/hll/PFAddExecutor.java
index 95af34a,0000000..150078a
mode 100755,000000..100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/redis/executor/hll/PFAddExecutor.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/redis/executor/hll/PFAddExecutor.java
@@@ -1,65 -1,0 +1,66 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *      http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package com.gemstone.gemfire.internal.redis.executor.hll;
 +
 +import java.util.List;
 +
 +import com.gemstone.gemfire.cache.Region;
++import com.gemstone.gemfire.internal.hll.HyperLogLogPlus;
 +import com.gemstone.gemfire.internal.redis.ByteArrayWrapper;
 +import com.gemstone.gemfire.internal.redis.Command;
 +import com.gemstone.gemfire.internal.redis.Coder;
 +import com.gemstone.gemfire.internal.redis.ExecutionHandlerContext;
 +import com.gemstone.gemfire.internal.redis.RedisConstants.ArityDef;
 +
 +public class PFAddExecutor extends HllExecutor {
 +
 +  @Override
 +  public void executeCommand(Command command, ExecutionHandlerContext context) {
 +    List<byte[]> commandElems = command.getProcessedCommand();
 +
 +    if (commandElems.size() < 2) {
 +      command.setResponse(Coder.getErrorResponse(context.getByteBufAllocator(), ArityDef.PFADD));
 +      return;
 +    }
 +
 +    ByteArrayWrapper key = command.getKey();
 +    checkAndSetDataType(key, context);
 +    Region<ByteArrayWrapper, HyperLogLogPlus> keyRegion = context.getRegionProvider().gethLLRegion();
 +
 +    HyperLogLogPlus hll = keyRegion.get(key);
 +
 +    boolean changed = false;
 +
 +    if (hll == null)
 +      hll = new HyperLogLogPlus(DEFAULT_HLL_DENSE);
 +
 +    for (int i = 2; i < commandElems.size(); i++) {
 +      byte[] bytes = commandElems.get(i);
 +      boolean offerChange = hll.offer(bytes);
 +      if (offerChange)
 +        changed = true;
 +    }
 +
 +    keyRegion.put(key, hll);
 +
 +    if (changed)
 +      command.setResponse(Coder.getIntegerResponse(context.getByteBufAllocator(), 1));
 +    else
 +      command.setResponse(Coder.getIntegerResponse(context.getByteBufAllocator(), 0));
 +  }
 +
 +}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/5beaaedc/geode-core/src/main/java/com/gemstone/gemfire/internal/redis/executor/hll/PFCountExecutor.java
----------------------------------------------------------------------
diff --cc geode-core/src/main/java/com/gemstone/gemfire/internal/redis/executor/hll/PFCountExecutor.java
index e081022,0000000..625a934
mode 100755,000000..100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/redis/executor/hll/PFCountExecutor.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/redis/executor/hll/PFCountExecutor.java
@@@ -1,68 -1,0 +1,70 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *      http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package com.gemstone.gemfire.internal.redis.executor.hll;
 +
 +import java.util.ArrayList;
 +import java.util.List;
 +
 +import com.gemstone.gemfire.cache.Region;
++import com.gemstone.gemfire.internal.hll.CardinalityMergeException;
++import com.gemstone.gemfire.internal.hll.HyperLogLogPlus;
 +import com.gemstone.gemfire.internal.redis.ByteArrayWrapper;
 +import com.gemstone.gemfire.internal.redis.Coder;
 +import com.gemstone.gemfire.internal.redis.Command;
 +import com.gemstone.gemfire.internal.redis.ExecutionHandlerContext;
 +import com.gemstone.gemfire.internal.redis.RedisConstants.ArityDef;
 +import com.gemstone.gemfire.internal.redis.RedisDataType;
 +
 +public class PFCountExecutor extends HllExecutor {
 +
 +  @Override
 +  public void executeCommand(Command command, ExecutionHandlerContext context) {
 +    List<byte[]> commandElems = command.getProcessedCommand();
 +
 +    if (commandElems.size() < 2) {
 +      command.setResponse(Coder.getErrorResponse(context.getByteBufAllocator(), ArityDef.PFCOUNT));
 +      return;
 +    }
 +
 +    Region<ByteArrayWrapper, HyperLogLogPlus> keyRegion = context.getRegionProvider().gethLLRegion();
 +
 +    List<HyperLogLogPlus> hlls = new ArrayList<HyperLogLogPlus>();
 +
 +    for (int i = 1; i < commandElems.size(); i++) {
 +      ByteArrayWrapper k = new ByteArrayWrapper(commandElems.get(i));
 +      checkDataType(k, RedisDataType.REDIS_HLL, context);
 +      HyperLogLogPlus h = keyRegion.get(k);
 +      if (h != null)
 +        hlls.add(h);
 +    }
 +    if (hlls.isEmpty()) {
 +      command.setResponse(Coder.getIntegerResponse(context.getByteBufAllocator(), 0));
 +      return;
 +    }
 +
 +    HyperLogLogPlus tmp = hlls.remove(0);
 +    HyperLogLogPlus[] estimators = hlls.toArray(new HyperLogLogPlus[hlls.size()]);
 +    try {
 +      tmp = (HyperLogLogPlus) tmp.merge(estimators);
 +    } catch (CardinalityMergeException e) {
 +      throw new RuntimeException(e);
 +    }
 +    long cardinality = tmp.cardinality();
 +    command.setResponse(Coder.getIntegerResponse(context.getByteBufAllocator(), cardinality));
 +  }
 +
 +}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/5beaaedc/geode-core/src/main/java/com/gemstone/gemfire/internal/redis/executor/hll/PFMergeExecutor.java
----------------------------------------------------------------------
diff --cc geode-core/src/main/java/com/gemstone/gemfire/internal/redis/executor/hll/PFMergeExecutor.java
index dd9afbc,0000000..f059cc2
mode 100755,000000..100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/redis/executor/hll/PFMergeExecutor.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/redis/executor/hll/PFMergeExecutor.java
@@@ -1,72 -1,0 +1,74 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *      http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package com.gemstone.gemfire.internal.redis.executor.hll;
 +
 +import java.util.ArrayList;
 +import java.util.List;
 +
 +import com.gemstone.gemfire.cache.Region;
++import com.gemstone.gemfire.internal.hll.CardinalityMergeException;
++import com.gemstone.gemfire.internal.hll.HyperLogLogPlus;
 +import com.gemstone.gemfire.internal.redis.ByteArrayWrapper;
 +import com.gemstone.gemfire.internal.redis.Coder;
 +import com.gemstone.gemfire.internal.redis.Command;
 +import com.gemstone.gemfire.internal.redis.ExecutionHandlerContext;
 +import com.gemstone.gemfire.internal.redis.RedisDataType;
 +import com.gemstone.gemfire.internal.redis.RedisConstants.ArityDef;
 +
 +public class PFMergeExecutor extends HllExecutor {
 +
 +  @Override
 +  public void executeCommand(Command command, ExecutionHandlerContext context) {
 +    List<byte[]> commandElems = command.getProcessedCommand();
 +
 +    if (commandElems.size() < 3) {
 +      command.setResponse(Coder.getErrorResponse(context.getByteBufAllocator(), ArityDef.PFMERGE));
 +      return;
 +    }
 +
 +    ByteArrayWrapper destKey = command.getKey();
 +    checkAndSetDataType(destKey, context);
 +    Region<ByteArrayWrapper, HyperLogLogPlus> keyRegion = context.getRegionProvider().gethLLRegion();
 +    HyperLogLogPlus mergedHLL = keyRegion.get(destKey);
 +    if (mergedHLL == null)
 +      mergedHLL = new HyperLogLogPlus(DEFAULT_HLL_DENSE);
 +    List<HyperLogLogPlus> hlls = new ArrayList<HyperLogLogPlus>();
 +
 +    for (int i = 2; i < commandElems.size(); i++) {
 +      ByteArrayWrapper k = new ByteArrayWrapper(commandElems.get(i));
 +      checkDataType(k, RedisDataType.REDIS_HLL, context);
 +      HyperLogLogPlus h = keyRegion.get(k);
 +      if (h != null)
 +        hlls.add(h);
 +    }
 +    if (hlls.isEmpty()) {
 +      context.getRegionProvider().removeKey(destKey);
 +      command.setResponse(Coder.getSimpleStringResponse(context.getByteBufAllocator(), "OK"));
 +      return;
 +    }
 +
 +    HyperLogLogPlus[] estimators = hlls.toArray(new HyperLogLogPlus[hlls.size()]);
 +    try {
 +      mergedHLL = (HyperLogLogPlus) mergedHLL.merge(estimators);
 +    } catch (CardinalityMergeException e) {
 +      throw new RuntimeException(e);
 +    }
 +    keyRegion.put(destKey, mergedHLL);
 +    command.setResponse(Coder.getSimpleStringResponse(context.getByteBufAllocator(), "OK"));
 +  }
 +
 +}



Mime
View raw message