Return-Path: X-Original-To: apmail-geode-commits-archive@minotaur.apache.org Delivered-To: apmail-geode-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 5C6A218DAB for ; Fri, 3 Jul 2015 19:23:17 +0000 (UTC) Received: (qmail 27003 invoked by uid 500); 3 Jul 2015 19:23:17 -0000 Delivered-To: apmail-geode-commits-archive@geode.apache.org Received: (qmail 26970 invoked by uid 500); 3 Jul 2015 19:23:17 -0000 Mailing-List: contact commits-help@geode.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@geode.incubator.apache.org Delivered-To: mailing list commits@geode.incubator.apache.org Received: (qmail 26961 invoked by uid 99); 3 Jul 2015 19:23:17 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 03 Jul 2015 19:23:17 +0000 X-ASF-Spam-Status: No, hits=-2000.7 required=5.0 tests=ALL_TRUSTED,RP_MATCHES_RCVD X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO mail.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with SMTP; Fri, 03 Jul 2015 19:20:33 +0000 Received: (qmail 18382 invoked by uid 99); 3 Jul 2015 19:21:04 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 03 Jul 2015 19:21:04 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 7799EE36A4; Fri, 3 Jul 2015 19:21:03 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: rvs@apache.org To: commits@geode.incubator.apache.org Date: Fri, 03 Jul 2015 19:21:30 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [29/51] [partial] incubator-geode git commit: SGA #2 X-Virus-Checked: Checked by ClamAV on apache.org http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/1709e627/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSStoreFactoryImpl.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSStoreFactoryImpl.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSStoreFactoryImpl.java new file mode 100644 index 0000000..e7e75dc --- /dev/null +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSStoreFactoryImpl.java @@ -0,0 +1,67 @@ +/*========================================================================= + * Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved. + * This product is protected by U.S. and international copyright + * and intellectual property laws. Pivotal products are covered by + * one or more patents listed at http://www.pivotal.io/patents. + *======================================================================== + */ + +package com.gemstone.gemfire.cache.hdfs.internal; + +import com.gemstone.gemfire.GemFireConfigException; +import com.gemstone.gemfire.cache.Cache; +import com.gemstone.gemfire.cache.hdfs.HDFSStore; +import com.gemstone.gemfire.cache.hdfs.StoreExistsException; +import com.gemstone.gemfire.internal.cache.GemFireCacheImpl; + + +/** + * Implementation of HDFSStoreFactory + * + * @author Hemant Bhanawat + */ +public class HDFSStoreFactoryImpl extends HDFSStoreCreation { + public static final String DEFAULT_ASYNC_QUEUE_ID_FOR_HDFS= "HDFS_QUEUE"; + + private Cache cache; + + public HDFSStoreFactoryImpl(Cache cache) { + this(cache, null); + } + + public HDFSStoreFactoryImpl(Cache cache, HDFSStoreCreation config) { + super(config); + this.cache = cache; + } + + @Override + public HDFSStore create(String name) { + if (name == null) { + throw new GemFireConfigException("HDFS store name not provided"); + } + + HDFSStore result = null; + synchronized (this) { + if (this.cache instanceof GemFireCacheImpl) { + GemFireCacheImpl gfc = (GemFireCacheImpl) this.cache; + if (gfc.findHDFSStore(name) != null) { + throw new StoreExistsException(name); + } + + HDFSStoreImpl hsi = new HDFSStoreImpl(name, this.configHolder); + gfc.addHDFSStore(hsi); + result = hsi; + } + } + return result; + } + + public static final String getEventQueueName(String regionPath) { + return HDFSStoreFactoryImpl.DEFAULT_ASYNC_QUEUE_ID_FOR_HDFS + "_" + + regionPath.replace('/', '_'); + } + + public HDFSStore getConfigView() { + return (HDFSStore) configHolder; + } +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/1709e627/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSStoreImpl.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSStoreImpl.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSStoreImpl.java new file mode 100644 index 0000000..8e7e358 --- /dev/null +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSStoreImpl.java @@ -0,0 +1,565 @@ +/*========================================================================= + * Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved. + * This product is protected by U.S. and international copyright + * and intellectual property laws. Pivotal products are covered by + * one or more patents listed at http://www.pivotal.io/patents. + *======================================================================== + */ + +package com.gemstone.gemfire.cache.hdfs.internal; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.net.URI; +import java.util.Collection; +import java.util.HashSet; +import java.util.concurrent.Callable; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocalFileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.io.hfile.BlockCache; +import org.apache.hadoop.hbase.io.hfile.LruBlockCache; +import org.apache.hadoop.hbase.regionserver.StoreFile; +import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics; +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.ipc.RemoteException; +import org.apache.hadoop.net.ConnectTimeoutException; +import org.apache.hadoop.security.SecurityUtil; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.logging.log4j.Logger; + +import com.gemstone.gemfire.cache.hdfs.HDFSEventQueueAttributes; +import com.gemstone.gemfire.cache.hdfs.HDFSIOException; +import com.gemstone.gemfire.cache.hdfs.HDFSStore; +import com.gemstone.gemfire.cache.hdfs.HDFSStoreMutator; +import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HDFSCompactionManager; +import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HDFSRegionDirector; +import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HDFSStoreDirector; +import com.gemstone.gemfire.cache.hdfs.internal.hoplog.Hoplog.HoplogWriter; +import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HoplogConfig; +import com.gemstone.gemfire.cache.hdfs.internal.hoplog.mapreduce.HoplogUtil; +import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem; +import com.gemstone.gemfire.internal.cache.control.HeapMemoryMonitor; +import com.gemstone.gemfire.internal.cache.persistence.soplog.HFileStoreStatistics; +import com.gemstone.gemfire.internal.i18n.LocalizedStrings; +import com.gemstone.gemfire.internal.logging.LogService; +import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage; +import com.gemstone.gemfire.internal.util.SingletonCallable; +import com.gemstone.gemfire.internal.util.SingletonValue; +import com.gemstone.gemfire.internal.util.SingletonValue.SingletonBuilder; + +/** + * Represents a HDFS based persistent store for region data. + * + * @author Ashvin Agrawal + */ +public class HDFSStoreImpl implements HDFSStore { + + private volatile HDFSStoreConfigHolder configHolder; + + private final SingletonValue fs; + + /** + * Used to make sure that only one thread creates the writer at a time. This prevents the dispatcher + * threads from cascading the Connection lock in DFS client see bug 51195 + */ + private final SingletonCallable singletonWriter = new SingletonCallable(); + + private final HFileStoreStatistics stats; + private final BlockCache blockCache; + + private static HashSet secureNameNodes = new HashSet(); + + private final boolean PERFORM_SECURE_HDFS_CHECK = Boolean.getBoolean(HoplogConfig.PERFORM_SECURE_HDFS_CHECK_PROP); + private static final Logger logger = LogService.getLogger(); + protected final String logPrefix; + + static { + HdfsConfiguration.init(); + } + + public HDFSStoreImpl(String name, final HDFSStore config) { + this.configHolder = new HDFSStoreConfigHolder(config); + configHolder.setName(name); + + this.logPrefix = "<" + "HdfsStore:" + name + "> "; + + stats = new HFileStoreStatistics(InternalDistributedSystem.getAnyInstance(), "HDFSStoreStatistics", name); + + final Configuration hconf = new Configuration(); + + // Set the block cache size. + // Disable the static block cache. We keep our own cache on the HDFS Store + // hconf.setFloat("hfile.block.cache.size", 0f); + if (this.getBlockCacheSize() != 0) { + long cacheSize = (long) (HeapMemoryMonitor.getTenuredPoolMaxMemory() * this.getBlockCacheSize() / 100); + + // TODO use an off heap block cache if we're using off heap memory? + // See CacheConfig.instantiateBlockCache. + // According to Anthony, the off heap block cache is still + // experimental. Our own off heap cache might be a better bet. +// this.blockCache = new LruBlockCache(cacheSize, +// StoreFile.DEFAULT_BLOCKSIZE_SMALL, hconf, HFileSortedOplogFactory.convertStatistics(stats)); + this.blockCache = new LruBlockCache(cacheSize, StoreFile.DEFAULT_BLOCKSIZE_SMALL, hconf); + } else { + this.blockCache = null; + } + + final String clientFile = config.getHDFSClientConfigFile(); + fs = new SingletonValue(new SingletonBuilder() { + @Override + public FileSystem create() throws IOException { + return createFileSystem(hconf, clientFile, false); + } + + @Override + public void postCreate() { + } + + @Override + public void createInProgress() { + } + }); + + FileSystem fileSystem = null; + try { + fileSystem = fs.get(); + } catch (Throwable ex) { + throw new HDFSIOException(ex.getMessage(),ex); + } + //HDFSCompactionConfig has already been initialized + long cleanUpIntervalMillis = getHDFSCompactionConfig().getOldFilesCleanupIntervalMins() * 60 * 1000; + Path cleanUpIntervalPath = new Path(getHomeDir(), HoplogConfig.CLEAN_UP_INTERVAL_FILE_NAME); + HoplogUtil.exposeCleanupIntervalMillis(fileSystem, cleanUpIntervalPath, cleanUpIntervalMillis); + } + + /** + * Creates a new file system every time. + */ + public FileSystem createFileSystem() { + Configuration hconf = new Configuration(); + try { + return createFileSystem(hconf, this.getHDFSClientConfigFile(), true); + } catch (Throwable ex) { + throw new HDFSIOException(ex.getMessage(),ex); + } + } + + private FileSystem createFileSystem(Configuration hconf, String configFile, boolean forceNew) throws IOException { + FileSystem filesystem = null; + + // load hdfs client config file if specified. The path is on local file + // system + if (configFile != null) { + if (logger.isDebugEnabled()) { + logger.debug("{}Adding resource config file to hdfs configuration:" + configFile, logPrefix); + } + hconf.addResource(new Path(configFile)); + + if (! new File(configFile).exists()) { + logger.warn(LocalizedMessage.create(LocalizedStrings.HOPLOG_HDFS_CLIENT_CONFIG_FILE_ABSENT, configFile)); + } + } + + // This setting disables shutdown hook for file system object. Shutdown + // hook may cause FS object to close before the cache or store and + // unpredictable behavior. This setting is provided for GFXD like server + // use cases where FS close is managed by a server. This setting is not + // supported by old versions of hadoop, HADOOP-4829 + hconf.setBoolean("fs.automatic.close", false); + + // Hadoop has a configuration parameter io.serializations that is a list of serialization + // classes which can be used for obtaining serializers and deserializers. This parameter + // by default contains avro classes. When a sequence file is created, it calls + // SerializationFactory.getSerializer(keyclass). This internally creates objects using + // reflection of all the classes that were part of io.serializations. But since, there is + // no avro class available it throws an exception. + // Before creating a sequenceFile, override the io.serializations parameter and pass only the classes + // that are important to us. + hconf.setStrings("io.serializations", + new String[]{"org.apache.hadoop.io.serializer.WritableSerialization"}); + // create writer + + SchemaMetrics.configureGlobally(hconf); + + String nameNodeURL = null; + if ((nameNodeURL = getNameNodeURL()) == null) { + nameNodeURL = hconf.get("fs.default.name"); + } + + URI namenodeURI = URI.create(nameNodeURL); + + //if (! GemFireCacheImpl.getExisting().isHadoopGfxdLonerMode()) { + String authType = hconf.get("hadoop.security.authentication"); + + //The following code handles Gemfire XD with secure HDFS + //A static set is used to cache all known secure HDFS NameNode urls. + UserGroupInformation.setConfiguration(hconf); + + //Compare authentication method ignoring case to make GFXD future version complaint + //At least version 2.0.2 starts complaining if the string "kerberos" is not in all small case. + //However it seems current version of hadoop accept the authType in any case + if (authType.equalsIgnoreCase("kerberos")) { + + String principal = hconf.get(HoplogConfig.KERBEROS_PRINCIPAL); + String keyTab = hconf.get(HoplogConfig.KERBEROS_KEYTAB_FILE); + + if (!PERFORM_SECURE_HDFS_CHECK) { + if (logger.isDebugEnabled()) + logger.debug("{}Ignore secure hdfs check", logPrefix); + } else { + if (!secureNameNodes.contains(nameNodeURL)) { + if (logger.isDebugEnabled()) + logger.debug("{}Executing secure hdfs check", logPrefix); + try{ + filesystem = FileSystem.newInstance(namenodeURI, hconf); + //Make sure no IOExceptions are generated when accessing insecure HDFS. + filesystem.listFiles(new Path("/"),false); + throw new HDFSIOException("Gemfire XD HDFS client and HDFS cluster security levels do not match. The configured HDFS Namenode is not secured."); + } catch (IOException ex) { + secureNameNodes.add(nameNodeURL); + } finally { + //Close filesystem to avoid resource leak + if(filesystem != null) { + closeFileSystemIgnoreError(filesystem); + } + } + } + } + + // check to ensure the namenode principal is defined + String nameNodePrincipal = hconf.get("dfs.namenode.kerberos.principal"); + if (nameNodePrincipal == null) { + throw new IOException(LocalizedStrings.GF_KERBEROS_NAMENODE_PRINCIPAL_UNDEF.toLocalizedString()); + } + + // ok, the user specified a gfxd principal so we will try to login + if (principal != null) { + //If NameNode principal is the same as Gemfire XD principal, there is a + //potential security hole + String regex = "[/@]"; + if (nameNodePrincipal != null) { + String HDFSUser = nameNodePrincipal.split(regex)[0]; + String GFXDUser = principal.split(regex)[0]; + if (HDFSUser.equals(GFXDUser)) { + logger.warn(LocalizedMessage.create(LocalizedStrings.HDFS_USER_IS_SAME_AS_GF_USER, GFXDUser)); + } + } + + // a keytab must exist if the user specifies a principal + if (keyTab == null) { + throw new IOException(LocalizedStrings.GF_KERBEROS_KEYTAB_UNDEF.toLocalizedString()); + } + + // the keytab must exist as well + File f = new File(keyTab); + if (!f.exists()) { + throw new FileNotFoundException(LocalizedStrings.GF_KERBEROS_KEYTAB_FILE_ABSENT.toLocalizedString(f.getAbsolutePath())); + } + + //Authenticate Gemfire XD principal to Kerberos KDC using Gemfire XD keytab file + String principalWithValidHost = SecurityUtil.getServerPrincipal(principal, ""); + UserGroupInformation.loginUserFromKeytab(principalWithValidHost, keyTab); + } else { + logger.warn(LocalizedMessage.create(LocalizedStrings.GF_KERBEROS_PRINCIPAL_UNDEF)); + } + } + //} + + filesystem = getFileSystemFactory().create(namenodeURI, hconf, forceNew); + + if (logger.isDebugEnabled()) { + logger.debug("{}Initialized FileSystem linked to " + filesystem.getUri() + + " " + filesystem.hashCode(), logPrefix); + } + return filesystem; + } + + public FileSystem getFileSystem() throws IOException { + return fs.get(); + } + + public FileSystem getCachedFileSystem() { + return fs.getCachedValue(); + } + + public SingletonCallable getSingletonWriter() { + return this.singletonWriter; + } + + private final SingletonCallable fsExists = new SingletonCallable(); + + public boolean checkFileSystemExists() throws IOException { + try { + return fsExists.runSerially(new Callable() { + @Override + public Boolean call() throws Exception { + FileSystem fileSystem = getCachedFileSystem(); + if (fileSystem == null) { + return false; + } + return fileSystem.exists(new Path("/")); + } + }); + } catch (Exception e) { + if (e instanceof IOException) { + throw (IOException)e; + } + throw new IOException(e); + } + } + + /** + * This method executes a query on namenode. If the query succeeds, FS + * instance is healthy. If it fails, the old instance is closed and a new + * instance is created. + */ + public void checkAndClearFileSystem() { + FileSystem fileSystem = getCachedFileSystem(); + + if (fileSystem != null) { + if (logger.isDebugEnabled()) { + logger.debug("{}Checking file system at " + fileSystem.getUri(), logPrefix); + } + try { + checkFileSystemExists(); + if (logger.isDebugEnabled()) { + logger.debug("{}FS client is ok: " + fileSystem.getUri() + " " + + fileSystem.hashCode(), logPrefix); + } + return; + } catch (ConnectTimeoutException e) { + if (logger.isDebugEnabled()) { + logger.debug("{}Hdfs unreachable, FS client is ok: " + + fileSystem.getUri() + " " + fileSystem.hashCode(), logPrefix); + } + return; + } catch (IOException e) { + logger.debug("IOError in filesystem checkAndClear ", e); + + // The file system is closed or NN is not reachable. It is safest to + // create a new FS instance. If the NN continues to remain unavailable, + // all subsequent read/write request will cause HDFSIOException. This is + // similar to the way hbase manages failures. This has a drawback + // though. A network blip will result in all connections to be + // recreated. However trying to preserve the connections and waiting for + // FS to auto-recover is not deterministic. + if (e instanceof RemoteException) { + e = ((RemoteException) e).unwrapRemoteException(); + } + + logger.warn(LocalizedMessage.create(LocalizedStrings.HOPLOG_HDFS_UNREACHABLE, + fileSystem.getUri()), e); + } + + // compare and clear FS container. The fs container needs to be reusable + boolean result = fs.clear(fileSystem, true); + if (!result) { + // the FS instance changed after this call was initiated. Check again + logger.debug("{}Failed to clear FS ! I am inconsistent so retrying ..", logPrefix); + checkAndClearFileSystem(); + } else { + closeFileSystemIgnoreError(fileSystem); + } + } + } + + private void closeFileSystemIgnoreError(FileSystem fileSystem) { + if (fileSystem == null) { + logger.debug("{}Trying to close null file system", logPrefix); + return; + } + + try { + if (logger.isDebugEnabled()) { + logger.debug("{}Closing file system at " + fileSystem.getUri() + " " + + fileSystem.hashCode(), logPrefix); + } + fileSystem.close(); + } catch (Exception e) { + if (logger.isDebugEnabled()) { + logger.debug("Failed to close file system at " + fileSystem.getUri() + + " " + fileSystem.hashCode(), e); + } + } + } + + public HFileStoreStatistics getStats() { + return stats; + } + + public BlockCache getBlockCache() { + return blockCache; + } + + public void close() { + logger.debug("{}Closing file system: " + getName(), logPrefix); + stats.close(); + blockCache.shutdown(); + //Might want to clear the block cache, but it should be dereferenced. + + // release DDL hoplog organizer for this store. Also shutdown compaction + // threads. These two resources hold references to GemfireCacheImpl + // instance. Any error is releasing this resources is not critical and needs + // be ignored. + try { + HDFSCompactionManager manager = HDFSCompactionManager.getInstance(this); + if (manager != null) { + manager.reset(); + } + } catch (Exception e) { + logger.info(e); + } + + // once this store is closed, this store should not be used again + FileSystem fileSystem = fs.clear(false); + if (fileSystem != null) { + closeFileSystemIgnoreError(fileSystem); + } + } + + /** + * Test hook to remove all of the contents of the the folder + * for this HDFS store from HDFS. + * @throws IOException + */ + public void clearFolder() throws IOException { + getFileSystem().delete(new Path(getHomeDir()), true); + } + + @Override + public void destroy() { + Collection regions = HDFSRegionDirector.getInstance().getRegionsInStore(this); + if(!regions.isEmpty()) { + throw new IllegalStateException("Cannot destroy a HDFS store that still contains regions: " + regions); + } + close(); + HDFSStoreDirector.getInstance().removeHDFSStore(this.getName()); + } + + @Override + public synchronized HDFSStore alter(HDFSStoreMutator mutator) { + if (logger.isDebugEnabled()) { + logger.debug("{}Altering hdfsStore " + this, logPrefix); + logger.debug("{}Mutator " + mutator, logPrefix); + } + HDFSStoreConfigHolder newHolder = new HDFSStoreConfigHolder(configHolder); + newHolder.copyFrom(mutator); + newHolder.getHDFSCompactionConfig().validate(); + HDFSStore oldStore = configHolder; + configHolder = newHolder; + if (logger.isDebugEnabled()) { + logger.debug("{}Resuult of Alter " + this, logPrefix); + } + return (HDFSStore) oldStore; + } + + @Override + public String toString() { + StringBuilder builder = new StringBuilder(); + builder.append("HDFSStoreImpl ["); + if (configHolder != null) { + builder.append("configHolder="); + builder.append(configHolder); + } + builder.append("]"); + return builder.toString(); + } + + @Override + public String getName() { + return configHolder.getName(); + } + + @Override + public String getNameNodeURL() { + return configHolder.getNameNodeURL(); + } + + @Override + public String getHomeDir() { + return configHolder.getHomeDir(); + } + + @Override + public String getHDFSClientConfigFile() { + return configHolder.getHDFSClientConfigFile(); + } + + @Override + public float getBlockCacheSize() { + return configHolder.getBlockCacheSize(); + } + + @Override + public int getMaxFileSize() { + return configHolder.getMaxFileSize(); + } + + @Override + public int getFileRolloverInterval() { + return configHolder.getFileRolloverInterval(); + } + + @Override + public boolean getMinorCompaction() { + return configHolder.getMinorCompaction(); + } + + @Override + public HDFSEventQueueAttributes getHDFSEventQueueAttributes() { + return configHolder.getHDFSEventQueueAttributes(); + } + + @Override + public HDFSCompactionConfig getHDFSCompactionConfig() { + return (HDFSCompactionConfig) configHolder.getHDFSCompactionConfig(); + } + + @Override + public HDFSStoreMutator createHdfsStoreMutator() { + return new HDFSStoreMutatorImpl(); + } + + public FileSystemFactory getFileSystemFactory() { + return new DistributedFileSystemFactory(); + } + + /* + * Factory to create HDFS file system instances + */ + static public interface FileSystemFactory { + public FileSystem create(URI namenode, Configuration conf, boolean forceNew) throws IOException; + } + + /* + * File system factory implementations for creating instances of file system + * connected to distributed HDFS cluster + */ + public class DistributedFileSystemFactory implements FileSystemFactory { + private final boolean ALLOW_TEST_FILE_SYSTEM = Boolean.getBoolean(HoplogConfig.ALLOW_LOCAL_HDFS_PROP); + private final boolean USE_FS_CACHE = Boolean.getBoolean(HoplogConfig.USE_FS_CACHE); + + @Override + public FileSystem create(URI nn, Configuration conf, boolean create) throws IOException { + FileSystem filesystem; + + if (USE_FS_CACHE && !create) { + filesystem = FileSystem.get(nn, conf); + } else { + filesystem = FileSystem.newInstance(nn, conf); + } + + if (filesystem instanceof LocalFileSystem && !ALLOW_TEST_FILE_SYSTEM) { + closeFileSystemIgnoreError(filesystem); + throw new IllegalStateException( + LocalizedStrings.HOPLOG_TRYING_TO_CREATE_STANDALONE_SYSTEM.toLocalizedString(getNameNodeURL())); + } + + return filesystem; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/1709e627/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSStoreMutatorImpl.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSStoreMutatorImpl.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSStoreMutatorImpl.java new file mode 100644 index 0000000..e4e2093 --- /dev/null +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSStoreMutatorImpl.java @@ -0,0 +1,264 @@ +/*========================================================================= + * Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved. + * This product is protected by U.S. and international copyright + * and intellectual property laws. Pivotal products are covered by + * one or more patents listed at http://www.pivotal.io/patents. + *======================================================================== + */ + +package com.gemstone.gemfire.cache.hdfs.internal; + +import com.gemstone.gemfire.cache.hdfs.HDFSEventQueueAttributes; +import com.gemstone.gemfire.cache.hdfs.HDFSEventQueueAttributesFactory; +import com.gemstone.gemfire.cache.hdfs.HDFSStore; +import com.gemstone.gemfire.cache.hdfs.HDFSStoreMutator; +import com.gemstone.gemfire.cache.hdfs.internal.HDFSStoreConfigHolder.AbstractHDFSCompactionConfigHolder; +import com.gemstone.gemfire.internal.i18n.LocalizedStrings; + +public class HDFSStoreMutatorImpl implements HDFSStoreMutator { + private HDFSStoreConfigHolder configHolder; + private Boolean autoCompact; + private HDFSCompactionConfigMutator compactionMutator; + private HDFSEventQueueAttributesMutator qMutator; + + public HDFSStoreMutatorImpl() { + configHolder = new HDFSStoreConfigHolder(); + configHolder.resetDefaultValues(); + compactionMutator = new HDFSCompactionConfigMutatorImpl(configHolder.getHDFSCompactionConfig()); + qMutator = new HDFSEventQueueAttributesMutatorImpl(null); + } + + public HDFSStoreMutatorImpl(HDFSStore store) { + configHolder = new HDFSStoreConfigHolder(store); + compactionMutator = new HDFSCompactionConfigMutatorImpl(configHolder.getHDFSCompactionConfig()); + // The following two steps are needed to set the null boolean values in compactionMutator + configHolder.setMinorCompaction(configHolder.getMinorCompaction()); + compactionMutator.setAutoMajorCompaction(configHolder.getHDFSCompactionConfig().getAutoMajorCompaction()); + qMutator = new HDFSEventQueueAttributesMutatorImpl(configHolder.getHDFSEventQueueAttributes()); + } + + public HDFSStoreMutator setMaxFileSize(int maxFileSize) { + configHolder.setMaxFileSize(maxFileSize); + return this; + } + @Override + public int getMaxFileSize() { + return configHolder.getMaxFileSize(); + } + + @Override + public HDFSStoreMutator setFileRolloverInterval(int count) { + configHolder.setFileRolloverInterval(count); + return this; + } + @Override + public int getFileRolloverInterval() { + return configHolder.getFileRolloverInterval(); + } + + @Override + public HDFSCompactionConfigMutator setMinorCompaction(boolean auto) { + autoCompact = Boolean.valueOf(auto); + configHolder.setMinorCompaction(auto); + return null; + } + @Override + public Boolean getMinorCompaction() { + return autoCompact; + } + + @Override + public HDFSCompactionConfigMutator getCompactionConfigMutator() { + return compactionMutator; + } + + @Override + public HDFSEventQueueAttributesMutator getHDFSEventQueueAttributesMutator() { + return qMutator; + } + + public static class HDFSEventQueueAttributesMutatorImpl implements HDFSEventQueueAttributesMutator { + private HDFSEventQueueAttributesFactory factory = new HDFSEventQueueAttributesFactory(); + int batchSize = -1; + int batchInterval = -1; + + public HDFSEventQueueAttributesMutatorImpl(HDFSEventQueueAttributes qAttrs) { + if (qAttrs == null) { + return; + } + + setBatchSizeMB(qAttrs.getBatchSizeMB()); + setBatchTimeInterval(qAttrs.getBatchTimeInterval()); + } + + @Override + public HDFSEventQueueAttributesMutator setBatchSizeMB(int size) { + factory.setBatchSizeMB(size); + batchSize = size; + // call factory.set to execute attribute value validation + return this; + } + @Override + public int getBatchSizeMB() { + return batchSize; + } + + @Override + public HDFSEventQueueAttributesMutator setBatchTimeInterval(int interval) { + batchInterval = interval; + // call factory.set to execute attribute value validation + factory.setBatchTimeInterval(interval); + return this; + } + @Override + public int getBatchTimeInterval() { + return batchInterval; + } + + @Override + public String toString() { + StringBuilder builder = new StringBuilder(); + builder.append("HDFSEventQueueAttributesMutatorImpl ["); + if (batchSize > -1) { + builder.append("batchSize="); + builder.append(batchSize); + builder.append(", "); + } + if (batchInterval > -1) { + builder.append("batchInterval="); + builder.append(batchInterval); + } + builder.append("]"); + return builder.toString(); + } + } + + /** + * @author ashvina + */ + public static class HDFSCompactionConfigMutatorImpl implements HDFSCompactionConfigMutator { + private AbstractHDFSCompactionConfigHolder configHolder; + private Boolean autoMajorCompact; + + public HDFSCompactionConfigMutatorImpl(AbstractHDFSCompactionConfigHolder configHolder) { + this.configHolder = configHolder; + } + + @Override + public HDFSCompactionConfigMutator setMaxInputFileSizeMB(int size) { + configHolder.setMaxInputFileSizeMB(size); + return this; + } + @Override + public int getMaxInputFileSizeMB() { + return configHolder.getMaxInputFileSizeMB(); + } + + @Override + public HDFSCompactionConfigMutator setMinInputFileCount(int count) { + configHolder.setMinInputFileCount(count); + return this; + } + @Override + public int getMinInputFileCount() { + return configHolder.getMinInputFileCount(); + } + + @Override + public HDFSCompactionConfigMutator setMaxInputFileCount(int count) { + configHolder.setMaxInputFileCount(count); + return this; + } + @Override + public int getMaxInputFileCount() { + return configHolder.getMaxInputFileCount(); + } + + @Override + public HDFSCompactionConfigMutator setMaxThreads(int count) { + configHolder.setMaxThreads(count); + return this; + } + @Override + public int getMaxThreads() { + return configHolder.getMaxThreads(); + } + + @Override + public HDFSCompactionConfigMutator setAutoMajorCompaction(boolean auto) { + autoMajorCompact = Boolean.valueOf(auto); + configHolder.setAutoMajorCompaction(auto); + return this; + } + @Override + public Boolean getAutoMajorCompaction() { + return autoMajorCompact; + } + + @Override + public HDFSCompactionConfigMutator setMajorCompactionIntervalMins(int count) { + configHolder.setMajorCompactionIntervalMins(count); + return this; + } + @Override + public int getMajorCompactionIntervalMins() { + return configHolder.getMajorCompactionIntervalMins(); + } + + @Override + public HDFSCompactionConfigMutator setMajorCompactionMaxThreads(int count) { + configHolder.setMajorCompactionMaxThreads(count); + return this; + } + @Override + public int getMajorCompactionMaxThreads() { + return configHolder.getMajorCompactionMaxThreads(); + } + + @Override + public HDFSCompactionConfigMutator setOldFilesCleanupIntervalMins( + int interval) { + configHolder.setOldFilesCleanupIntervalMins(interval); + return this; + } + @Override + public int getOldFilesCleanupIntervalMins() { + return configHolder.getOldFilesCleanupIntervalMins(); + } + } + + public static void assertIsPositive(String name, int count) { + if (count < 1) { + throw new IllegalArgumentException( + LocalizedStrings.DiskWriteAttributesImpl_0_HAS_TO_BE_POSITIVE_NUMBER_AND_THE_VALUE_GIVEN_1_IS_NOT_ACCEPTABLE + .toLocalizedString(new Object[] { name, count })); + } + } + + @Override + public String toString() { + StringBuilder builder = new StringBuilder(); + builder.append("HDFSStoreMutatorImpl ["); + if (configHolder != null) { + builder.append("configHolder="); + builder.append(configHolder); + builder.append(", "); + } + if (autoCompact != null) { + builder.append("MinorCompaction="); + builder.append(autoCompact); + builder.append(", "); + } + if (compactionMutator.getAutoMajorCompaction() != null) { + builder.append("autoMajorCompaction="); + builder.append(compactionMutator.getAutoMajorCompaction()); + builder.append(", "); + } + if (qMutator != null) { + builder.append("qMutator="); + builder.append(qMutator); + } + builder.append("]"); + return builder.toString(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/1709e627/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSWriteOnlyStoreEventListener.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSWriteOnlyStoreEventListener.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSWriteOnlyStoreEventListener.java new file mode 100644 index 0000000..0787ee0 --- /dev/null +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSWriteOnlyStoreEventListener.java @@ -0,0 +1,176 @@ +/*========================================================================= + * Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved. + * This product is protected by U.S. and international copyright + * and intellectual property laws. Pivotal products are covered by + * one or more patents listed at http://www.pivotal.io/patents. + *======================================================================== + */ + +package com.gemstone.gemfire.cache.hdfs.internal; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +import com.gemstone.gemfire.cache.CacheClosedException; +import com.gemstone.gemfire.cache.asyncqueue.AsyncEvent; +import com.gemstone.gemfire.cache.asyncqueue.AsyncEventListener; +import com.gemstone.gemfire.cache.hdfs.internal.hoplog.Hoplog; +import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HoplogOrganizer; +import com.gemstone.gemfire.i18n.LogWriterI18n; +import com.gemstone.gemfire.internal.cache.BucketRegion; +import com.gemstone.gemfire.internal.cache.ForceReattemptException; +import com.gemstone.gemfire.internal.cache.LocalRegion; +import com.gemstone.gemfire.internal.cache.PartitionedRegion; +import com.gemstone.gemfire.internal.cache.PrimaryBucketException; +import com.gemstone.gemfire.internal.cache.execute.BucketMovedException; +import com.gemstone.gemfire.internal.i18n.LocalizedStrings; + +/** + * Listener that persists events to a write only HDFS store + * + * @author Hemant Bhanawat + */ +public class HDFSWriteOnlyStoreEventListener implements + AsyncEventListener { + + private final LogWriterI18n logger; + private volatile boolean senderStopped = false; + private final FailureTracker failureTracker = new FailureTracker(10L, 60 * 1000L, 1.5f); + + + public HDFSWriteOnlyStoreEventListener(LogWriterI18n logger) { + this.logger = logger; + } + + @Override + public void close() { + senderStopped = true; + } + + @Override + public boolean processEvents(List events) { + if (Hoplog.NOP_WRITE) { + return true; + } + + if (logger.fineEnabled()) + logger.fine("HDFSWriteOnlyStoreEventListener: A total of " + events.size() + " events are sent from GemFire to persist on HDFS"); + boolean success = false; + try { + failureTracker.sleepIfRetry(); + HDFSGatewayEventImpl hdfsEvent = null; + int previousBucketId = -1; + BatchManager bm = null; + for (AsyncEvent asyncEvent : events) { + if (senderStopped){ + if (logger.fineEnabled()) { + logger.fine("HDFSWriteOnlyStoreEventListener.processEvents: Cache is closing down. Ignoring the batch of data."); + } + return false; + } + hdfsEvent = (HDFSGatewayEventImpl)asyncEvent; + if (previousBucketId != hdfsEvent.getBucketId()){ + if (previousBucketId != -1) + persistBatch(bm, previousBucketId); + + previousBucketId = hdfsEvent.getBucketId(); + bm = new BatchManager(); + } + bm.addEvent(hdfsEvent); + } + try { + persistBatch(bm, hdfsEvent.getBucketId()); + } catch (BucketMovedException e) { + logger.fine("Batch could not be written to HDFS as the bucket moved. bucket id: " + + hdfsEvent.getBucketId() + " Exception: " + e); + return false; + } + success = true; + } catch (IOException e) { + logger.warning(LocalizedStrings.HOPLOG_FLUSH_FOR_BATCH_FAILED, e); + return false; + } + catch (ClassNotFoundException e) { + logger.warning(LocalizedStrings.HOPLOG_FLUSH_FOR_BATCH_FAILED, e); + return false; + } + catch (CacheClosedException e) { + // exit silently + if (logger.fineEnabled()) + logger.fine(e); + return false; + } catch (ForceReattemptException e) { + if (logger.fineEnabled()) + logger.fine(e); + return false; + } catch (InterruptedException e1) { + // TODO Auto-generated catch block + e1.printStackTrace(); + } finally { + failureTracker.record(success); + } + return true; + } + + /** + * Persists batches of multiple regions specified by the batch manager + * + */ + private void persistBatch(BatchManager bm, int bucketId) throws IOException, ForceReattemptException { + Iterator>> eventsPerRegion = + bm.iterator(); + HoplogOrganizer bucketOrganizer = null; + while (eventsPerRegion.hasNext()) { + Map.Entry> eventsForARegion = eventsPerRegion.next(); + bucketOrganizer = getOrganizer((PartitionedRegion) eventsForARegion.getKey(), bucketId); + // bucket organizer cannot be null. + if (bucketOrganizer == null) + throw new BucketMovedException("Bucket moved. BucketID: " + bucketId + " HdfsRegion: " + eventsForARegion.getKey().getName()); + bucketOrganizer.flush(eventsForARegion.getValue().iterator(), eventsForARegion.getValue().size()); + if (logger.fineEnabled()) { + logger.fine("Batch written to HDFS of size " + eventsForARegion.getValue().size() + + " for region " + eventsForARegion.getKey()); + } + } + } + + private HoplogOrganizer getOrganizer(PartitionedRegion region, int bucketId) { + BucketRegion br = region.getDataStore().getLocalBucketById(bucketId); + if (br == null) { + // got rebalanced or something + throw new BucketMovedException("Bucket region is no longer available. BucketId: "+ + bucketId + " HdfsRegion: " + region.getName()); + } + + return br.getHoplogOrganizer(); + } + + /** + * Sorts out events of the multiple regions into lists per region + * + */ + private class BatchManager implements Iterable>> { + private HashMap> regionBatches = + new HashMap>(); + + public void addEvent (HDFSGatewayEventImpl hdfsEvent) throws IOException, ClassNotFoundException { + LocalRegion region = (LocalRegion) hdfsEvent.getRegion(); + ArrayList regionList = regionBatches.get(region); + if (regionList == null) { + regionList = new ArrayList(); + regionBatches.put(region, regionList); + } + regionList.add(new UnsortedHDFSQueuePersistedEvent(hdfsEvent)); + } + + @Override + public Iterator>> iterator() { + return regionBatches.entrySet().iterator(); + } + + } +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/1709e627/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HoplogListenerForRegion.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HoplogListenerForRegion.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HoplogListenerForRegion.java new file mode 100644 index 0000000..e2bef03 --- /dev/null +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HoplogListenerForRegion.java @@ -0,0 +1,64 @@ +/*========================================================================= + * Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved. + * This product is protected by U.S. and international copyright + * and intellectual property laws. Pivotal products are covered by + * one or more patents listed at http://www.pivotal.io/patents. + *======================================================================== + */ + +package com.gemstone.gemfire.cache.hdfs.internal; + +import java.io.IOException; +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; + +import com.gemstone.gemfire.cache.hdfs.HDFSIOException; +import com.gemstone.gemfire.cache.hdfs.internal.hoplog.Hoplog; +import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HoplogListener; + +/** + * Objects of this class needs to be created for every region. These objects + * listen to the oplog events and take appropriate action. + * + * @author Hemant Bhanawat + */ +public class HoplogListenerForRegion implements HoplogListener { + + private List otherListeners = new CopyOnWriteArrayList(); + + public HoplogListenerForRegion() { + + } + + @Override + public void hoplogCreated(String regionFolder, int bucketId, + Hoplog... oplogs) throws IOException { + for (HoplogListener listener : this.otherListeners) { + listener.hoplogCreated(regionFolder, bucketId, oplogs); + } + } + + @Override + public void hoplogDeleted(String regionFolder, int bucketId, + Hoplog... oplogs) { + for (HoplogListener listener : this.otherListeners) { + try { + listener.hoplogDeleted(regionFolder, bucketId, oplogs); + } catch (IOException e) { + // TODO handle + throw new HDFSIOException(e.getLocalizedMessage(), e); + } + } + } + + public void addListener(HoplogListener listener) { + this.otherListeners.add(listener); + } + + @Override + public void compactionCompleted(String region, int bucket, boolean isMajor) { + for (HoplogListener listener : this.otherListeners) { + listener.compactionCompleted(region, bucket, isMajor); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/1709e627/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/PersistedEventImpl.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/PersistedEventImpl.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/PersistedEventImpl.java new file mode 100644 index 0000000..e66e82d --- /dev/null +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/PersistedEventImpl.java @@ -0,0 +1,194 @@ +/*========================================================================= + * Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved. + * This product is protected by U.S. and international copyright + * and intellectual property laws. Pivotal products are covered by + * one or more patents listed at http://www.pivotal.io/patents. + *======================================================================== + */ + +package com.gemstone.gemfire.cache.hdfs.internal; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import com.gemstone.gemfire.DataSerializer; +import com.gemstone.gemfire.cache.Operation; +import com.gemstone.gemfire.internal.DataSerializableFixedID; +import com.gemstone.gemfire.internal.cache.CachedDeserializable; +import com.gemstone.gemfire.internal.cache.CachedDeserializableFactory; +import com.gemstone.gemfire.internal.cache.EntryEventImpl; +import com.gemstone.gemfire.internal.cache.lru.Sizeable; +import com.gemstone.gemfire.internal.cache.versions.VersionTag; +import com.gemstone.gemfire.internal.Version; + +/** + * Event that is persisted in HDFS. As we need to persist some of the EntryEventImpl + * variables, we have created this class and have overridden toData and fromData functions. + * + * There are subclasses of this class of the different types of persisted events + * sorted vs. unsorted, and the persisted events we keep in the region + * queue, which need to hold the region key. + * + * + * @author Hemant Bhanawat + */ +public abstract class PersistedEventImpl { + protected Operation op = Operation.UPDATE; + + protected Object valueObject; + + /** + * A field with flags decribing the event + */ + protected byte flags; + + //FLags indicating the type of value + //if the value is not a byte array or object, is is an internal delta. + private static final byte VALUE_IS_BYTE_ARRAY= 0x01; + private static final byte VALUE_IS_OBJECT= (VALUE_IS_BYTE_ARRAY << 1); + private static final byte POSSIBLE_DUPLICATE = (VALUE_IS_OBJECT << 1); + private static final byte HAS_VERSION_TAG = (POSSIBLE_DUPLICATE << 1); + + + /** for deserialization */ + public PersistedEventImpl() { + } + + public PersistedEventImpl(Object value, Operation op, byte valueIsObject, + boolean isPossibleDuplicate, boolean hasVersionTag) throws IOException, + ClassNotFoundException { + this.op = op; + this.valueObject = value; + setFlag(VALUE_IS_BYTE_ARRAY, valueIsObject == 0x00); + setFlag(VALUE_IS_OBJECT, valueIsObject == 0x01); + setFlag(POSSIBLE_DUPLICATE, isPossibleDuplicate); + setFlag(HAS_VERSION_TAG, hasVersionTag); + } + + private void setFlag(byte flag, boolean set) { + flags = (byte) (set ? flags | flag : flags & ~flag); + } + + private boolean getFlag(byte flag) { + return (flags & flag) != 0x0; + } + + public void toData(DataOutput out) throws IOException { + out.writeByte(this.op.ordinal); + out.writeByte(this.flags); + + if (getFlag(VALUE_IS_BYTE_ARRAY)) { + DataSerializer.writeByteArray((byte[])this.valueObject, out); + } else if (getFlag(VALUE_IS_OBJECT)) { + if(valueObject instanceof CachedDeserializable) { + CachedDeserializable cd = (CachedDeserializable)valueObject; + DataSerializer.writeObjectAsByteArray(cd.getValue(), out); + } else { + DataSerializer.writeObjectAsByteArray(valueObject, out); + } + } + else { + DataSerializer.writeObject(valueObject, out); + } + } + + public void fromData(DataInput in) throws IOException, ClassNotFoundException { + this.op = Operation.fromOrdinal(in.readByte()); + this.flags = in.readByte(); + + if (getFlag(VALUE_IS_BYTE_ARRAY)) { + this.valueObject = DataSerializer.readByteArray(in); + } else if (getFlag(VALUE_IS_OBJECT)) { + byte[] newValueBytes = DataSerializer.readByteArray(in); + if(newValueBytes == null) { + this.valueObject = null; + } else { + if(CachedDeserializableFactory.preferObject()) { + this.valueObject = EntryEventImpl.deserialize(newValueBytes); + } else { + this.valueObject = CachedDeserializableFactory.create(newValueBytes); + } + } + } + else { + this.valueObject = DataSerializer.readObject(in); + } + + } + + /** + * Return the timestamp of this event. Depending on the subclass, + * this may be part of the version tag, or a separate field. + */ + public abstract long getTimstamp(); + + protected boolean hasVersionTag() { + return getFlag(HAS_VERSION_TAG); + } + + public Operation getOperation() + { + return this.op; + } + + public Object getValue() { + return this.valueObject; + } + + public boolean isPossibleDuplicate() + { + return getFlag(POSSIBLE_DUPLICATE); + } + + /** + * returns deserialized value. + * + */ + public Object getDeserializedValue() throws IOException, ClassNotFoundException { + Object retVal = null; + if (getFlag(VALUE_IS_BYTE_ARRAY)) { + // value is a byte array + retVal = this.valueObject; + } else if (getFlag(VALUE_IS_OBJECT)) { + if(valueObject instanceof CachedDeserializable) { + retVal = ((CachedDeserializable)valueObject).getDeserializedForReading(); + } else { + retVal = valueObject; + } + } + else { + // value is a object + retVal = this.valueObject; + } + return retVal; + } + + @Override + public String toString() { + StringBuilder str = new StringBuilder(PersistedEventImpl.class.getSimpleName()); + str.append("@").append(System.identityHashCode(this)) + .append(" op:").append(op) + .append(" valueObject:").append(valueObject) + .append(" isPossibleDuplicate:").append(getFlag(POSSIBLE_DUPLICATE)); + return str.toString(); + } + + public void copy(PersistedEventImpl usersValue) { + this.op = usersValue.op; + this.valueObject = usersValue.valueObject; + this.flags = usersValue.flags; + } + + public static int getSizeInBytes(int keySize, int valueSize, VersionTag versionTag) { + int size = 0; + + // value length + size += valueSize; + + // one byte for op and one byte for flag + size += 2; + + return size; + } +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/1709e627/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/QueuedPersistentEvent.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/QueuedPersistentEvent.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/QueuedPersistentEvent.java new file mode 100644 index 0000000..d679003 --- /dev/null +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/QueuedPersistentEvent.java @@ -0,0 +1,11 @@ +package com.gemstone.gemfire.cache.hdfs.internal; + +import java.io.DataOutput; +import java.io.IOException; + +public interface QueuedPersistentEvent { + + public byte[] getRawKey(); + + public void toHoplogEventBytes(DataOutput out) throws IOException; +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/1709e627/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/SignalledFlushObserver.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/SignalledFlushObserver.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/SignalledFlushObserver.java new file mode 100644 index 0000000..b9b1948 --- /dev/null +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/SignalledFlushObserver.java @@ -0,0 +1,107 @@ +package com.gemstone.gemfire.cache.hdfs.internal; + +import java.util.ArrayList; +import java.util.List; +import java.util.ListIterator; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +/** + * Tracks flushes using a queue of latches. + * + * @author bakera + */ +public class SignalledFlushObserver implements FlushObserver { + private static class FlushLatch extends CountDownLatch { + private final long seqnum; + + public FlushLatch(long seqnum) { + super(1); + this.seqnum = seqnum; + } + + public long getSequence() { + return seqnum; + } + } + + // assume the number of outstanding flush requests is small so we don't + // need to organize by seqnum + private final List signals; + + private final AtomicLong eventsReceived; + private final AtomicLong eventsDelivered; + + public SignalledFlushObserver() { + signals = new ArrayList(); + eventsReceived = new AtomicLong(0); + eventsDelivered = new AtomicLong(0); + } + + @Override + public boolean shouldDrainImmediately() { + synchronized (signals) { + return !signals.isEmpty(); + } + } + + @Override + public AsyncFlushResult flush() { + final long seqnum = eventsReceived.get(); + synchronized (signals) { + final FlushLatch flush; + if (seqnum <= eventsDelivered.get()) { + flush = null; + } else { + flush = new FlushLatch(seqnum); + signals.add(flush); + } + + return new AsyncFlushResult() { + @Override + public boolean waitForFlush(long timeout, TimeUnit unit) throws InterruptedException { + return flush == null ? true : flush.await(timeout, unit); + } + }; + } + } + + /** + * Invoked when an event is received. + */ + public void push() { + eventsReceived.incrementAndGet(); + } + + /** + * Invoked when a batch has been dispatched. + */ + public void pop(int count) { + long highmark = eventsDelivered.addAndGet(count); + synchronized (signals) { + for (ListIterator iter = signals.listIterator(); iter.hasNext(); ) { + FlushLatch flush = iter.next(); + if (flush.getSequence() <= highmark) { + flush.countDown(); + iter.remove(); + } + } + } + } + + /** + * Invoked when the queue is cleared. + */ + public void clear() { + synchronized (signals) { + for (FlushLatch flush : signals) { + flush.countDown(); + } + + signals.clear(); + eventsReceived.set(0); + eventsDelivered.set(0); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/1709e627/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/SizeTieredHdfsCompactionConfigHolder.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/SizeTieredHdfsCompactionConfigHolder.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/SizeTieredHdfsCompactionConfigHolder.java new file mode 100644 index 0000000..57d58b7 --- /dev/null +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/SizeTieredHdfsCompactionConfigHolder.java @@ -0,0 +1,74 @@ +/*========================================================================= + * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved. + * This product is protected by U.S. and international copyright + * and intellectual property laws. Pivotal products are covered by + * one or more patents listed at http://www.pivotal.io/patents. + *========================================================================= + */ +package com.gemstone.gemfire.cache.hdfs.internal; + +import com.gemstone.gemfire.cache.hdfs.HDFSStoreFactory.HDFSCompactionConfigFactory; +import com.gemstone.gemfire.cache.hdfs.internal.HDFSStoreConfigHolder.AbstractHDFSCompactionConfigHolder; +import com.gemstone.gemfire.internal.cache.xmlcache.CacheXml; +import com.gemstone.gemfire.internal.i18n.LocalizedStrings; + +/** + * Class for authorization and validation of HDFS size Tiered compaction config + * + * @author ashvina + */ +public class SizeTieredHdfsCompactionConfigHolder extends AbstractHDFSCompactionConfigHolder { + @Override + public String getCompactionStrategy() { + return SIZE_ORIENTED; + } + + @Override + public HDFSCompactionConfigFactory setMaxInputFileSizeMB(int size) { + HDFSStoreCreation.assertIsPositive("HDFS_COMPACTION_MAX_INPUT_FILE_SIZE_MB", size); + this.maxInputFileSizeMB = size; + return this; + } + + @Override + public HDFSCompactionConfigFactory setMinInputFileCount(int count) { + HDFSStoreCreation.assertIsPositive("HDFS_COMPACTION_MIN_INPUT_FILE_COUNT", count); + this.minInputFileCount = count; + return this; + } + + @Override + public HDFSCompactionConfigFactory setMaxInputFileCount(int count) { + HDFSStoreCreation.assertIsPositive("HDFS_COMPACTION_MAX_INPUT_FILE_COUNT", count); + this.maxInputFileCount = count; + return this; + } + + @Override + public HDFSCompactionConfigFactory setMajorCompactionIntervalMins(int count) { + HDFSStoreCreation.assertIsPositive(CacheXml.HDFS_MAJOR_COMPACTION_INTERVAL, count); + this.majorCompactionIntervalMins = count; + return this; + } + + @Override + public HDFSCompactionConfigFactory setMajorCompactionMaxThreads(int count) { + HDFSStoreCreation.assertIsPositive(CacheXml.HDFS_MAJOR_COMPACTION_THREADS, count); + this.majorCompactionConcurrency = count; + return this; + } + + @Override + protected void validate() { + if (minInputFileCount > maxInputFileCount) { + throw new IllegalArgumentException( + LocalizedStrings.HOPLOG_MIN_IS_MORE_THAN_MAX + .toLocalizedString(new Object[] { + "HDFS_COMPACTION_MIN_INPUT_FILE_COUNT", + minInputFileCount, + "HDFS_COMPACTION_MAX_INPUT_FILE_COUNT", + maxInputFileCount })); + } + super.validate(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/1709e627/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/SortedHDFSQueuePersistedEvent.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/SortedHDFSQueuePersistedEvent.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/SortedHDFSQueuePersistedEvent.java new file mode 100644 index 0000000..7bbcf7b --- /dev/null +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/SortedHDFSQueuePersistedEvent.java @@ -0,0 +1,78 @@ +/*========================================================================= + * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved. + * This product is protected by U.S. and international copyright + * and intellectual property laws. Pivotal products are covered by + * one or more patents listed at http://www.pivotal.io/patents. + *========================================================================= + */ +package com.gemstone.gemfire.cache.hdfs.internal; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import com.gemstone.gemfire.DataSerializer; +import com.gemstone.gemfire.cache.Operation; +import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem; +import com.gemstone.gemfire.internal.DataSerializableFixedID; +import com.gemstone.gemfire.internal.cache.versions.VersionTag; +import com.gemstone.gemfire.internal.Version; + +/** + * A persistent event that is stored in the hoplog queue. This class is only used + * temporarily to copy the data from the HDFSGatewayEventImpl to the persisted + * record in the file. + * + * @author dsmith + * + */ +public class SortedHDFSQueuePersistedEvent extends SortedHoplogPersistedEvent implements QueuedPersistentEvent { + + + /**key stored in serialized form*/ + protected byte[] keyBytes = null; + + public SortedHDFSQueuePersistedEvent(HDFSGatewayEventImpl in) throws IOException, + ClassNotFoundException { + this(in.getSerializedValue(), in.getOperation(), in.getValueIsObject(), in + .getPossibleDuplicate(), in.getVersionTag(), in.getSerializedKey(), in + .getCreationTime()); + } + + public SortedHDFSQueuePersistedEvent(Object valueObject, Operation operation, + byte valueIsObject, boolean possibleDuplicate, VersionTag versionTag, + byte[] serializedKey, long timestamp) throws ClassNotFoundException, IOException { + super(valueObject, operation, valueIsObject, possibleDuplicate, versionTag, timestamp); + this.keyBytes = serializedKey; + // TODO Auto-generated constructor stub + } + + @Override + public void toData(DataOutput out) throws IOException { + super.toData(out); + DataSerializer.writeByteArray(this.keyBytes, out); + } + + @Override + public void fromData(DataInput in) throws IOException, ClassNotFoundException { + super.fromData(in); + this.keyBytes = DataSerializer.readByteArray(in); + } + + @Override + public void toHoplogEventBytes(DataOutput out) throws IOException { + super.toData(out); + } + + public byte[] getRawKey() { + return this.keyBytes; + } + public static int getSizeInBytes(int keySize, int valueSize, VersionTag versionTag) { + + int size = SortedHoplogPersistedEvent.getSizeInBytes(keySize, valueSize, versionTag); + + size += keySize; + + return size; + } +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/1709e627/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/SortedHoplogPersistedEvent.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/SortedHoplogPersistedEvent.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/SortedHoplogPersistedEvent.java new file mode 100644 index 0000000..a2bcba1 --- /dev/null +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/SortedHoplogPersistedEvent.java @@ -0,0 +1,106 @@ +/*========================================================================= + * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved. + * This product is protected by U.S. and international copyright + * and intellectual property laws. Pivotal products are covered by + * one or more patents listed at http://www.pivotal.io/patents. + *========================================================================= + */ +package com.gemstone.gemfire.cache.hdfs.internal; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import com.gemstone.gemfire.DataSerializer; +import com.gemstone.gemfire.cache.Operation; +import com.gemstone.gemfire.internal.ByteArrayDataInput; +import com.gemstone.gemfire.internal.cache.versions.VersionTag; + +/** + * A persistent event that is stored in a sorted hoplog. In addition + * to the fields of PersistentEventImpl, this event has a version tag. + * + * This class should only be serialized by directly calling toData, + * which is why it does not implement DataSerializable + * + * @author dsmith + */ +public class SortedHoplogPersistedEvent extends PersistedEventImpl { + /** version tag for concurrency checks */ + protected VersionTag versionTag; + + /** timestamp of the event. Used when version checks are disabled*/ + protected long timestamp; + + public SortedHoplogPersistedEvent(Object valueObject, Operation operation, + byte valueIsObject, boolean possibleDuplicate, VersionTag tag, long timestamp) throws ClassNotFoundException, IOException { + super(valueObject, operation, valueIsObject, possibleDuplicate, tag != null); + this.versionTag = tag; + this.timestamp = timestamp; + } + + public SortedHoplogPersistedEvent() { + //for deserialization + } + + @Override + public long getTimstamp() { + return versionTag == null ? timestamp : versionTag.getVersionTimeStamp(); + } + + @Override + public void toData(DataOutput out) throws IOException { + super.toData(out); + if (versionTag == null) { + out.writeLong(timestamp); + } else { + //TODO optimize these + DataSerializer.writeObject(this.versionTag, out); + } + } + + @Override + public void fromData(DataInput in) throws IOException, ClassNotFoundException { + super.fromData(in); + if (hasVersionTag()) { + this.versionTag = (VersionTag)DataSerializer.readObject(in); + } else { + this.timestamp = in.readLong(); + } + } + + /** + * @return the concurrency versioning tag for this event, if any + */ + public VersionTag getVersionTag() { + return this.versionTag; + } + + public static SortedHoplogPersistedEvent fromBytes(byte[] val) + throws IOException, ClassNotFoundException { + ByteArrayDataInput in = new ByteArrayDataInput(); + in.initialize(val, null); + SortedHoplogPersistedEvent event = new SortedHoplogPersistedEvent(); + event.fromData(in); + return event; + } + + public void copy(PersistedEventImpl usersValue) { + super.copy(usersValue); + this.versionTag = ((SortedHoplogPersistedEvent) usersValue).versionTag; + this.timestamp = ((SortedHoplogPersistedEvent) usersValue).timestamp; + } + + public static int getSizeInBytes(int keySize, int valueSize, VersionTag versionTag) { + int size = PersistedEventImpl.getSizeInBytes(keySize, valueSize, versionTag); + + if (versionTag != null) { + size += versionTag.getSizeInBytes(); + } else { + // size of Timestamp + size += 8; + } + + return size; + } +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/1709e627/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/UnsortedHDFSQueuePersistedEvent.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/UnsortedHDFSQueuePersistedEvent.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/UnsortedHDFSQueuePersistedEvent.java new file mode 100644 index 0000000..7b0570b --- /dev/null +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/UnsortedHDFSQueuePersistedEvent.java @@ -0,0 +1,68 @@ +/*========================================================================= + * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved. + * This product is protected by U.S. and international copyright + * and intellectual property laws. Pivotal products are covered by + * one or more patents listed at http://www.pivotal.io/patents. + *========================================================================= + */ +package com.gemstone.gemfire.cache.hdfs.internal; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import com.gemstone.gemfire.DataSerializer; +import com.gemstone.gemfire.internal.cache.versions.VersionTag; + + +/** + * A persistent event that is stored in the hoplog queue. This class is only used + * temporarily to copy the data from the HDFSGatewayEventImpl to the persisted + * record in the file. + * + * @author dsmith + * + */ +public class UnsortedHDFSQueuePersistedEvent extends UnsortedHoplogPersistedEvent implements QueuedPersistentEvent { + + /**the bytes of the key for this entry */ + protected byte[] keyBytes = null; + + public UnsortedHDFSQueuePersistedEvent(HDFSGatewayEventImpl in) throws IOException, + ClassNotFoundException { + super(in.getValue(), in.getOperation(), in.getValueIsObject(), in.getPossibleDuplicate(), + in.getVersionTimeStamp() == 0 ? in.getCreationTime() : in.getVersionTimeStamp()); + this.keyBytes = in.getSerializedKey(); + + } + + @Override + public void toData(DataOutput out) throws IOException { + super.toData(out); + DataSerializer.writeByteArray(this.keyBytes, out); + } + + @Override + public void fromData(DataInput in) throws IOException, ClassNotFoundException { + super.fromData(in); + this.keyBytes = DataSerializer.readByteArray(in); + } + + @Override + public void toHoplogEventBytes(DataOutput out) throws IOException { + super.toData(out); + } + + public byte[] getRawKey() { + return this.keyBytes; + } + + public static int getSizeInBytes(int keySize, int valueSize, VersionTag versionTag) { + + int size = UnsortedHoplogPersistedEvent.getSizeInBytes(keySize, valueSize, versionTag); + + size += keySize; + + return size; + } +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/1709e627/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/UnsortedHoplogPersistedEvent.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/UnsortedHoplogPersistedEvent.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/UnsortedHoplogPersistedEvent.java new file mode 100644 index 0000000..ee48b16 --- /dev/null +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/UnsortedHoplogPersistedEvent.java @@ -0,0 +1,84 @@ +/*========================================================================= + * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved. + * This product is protected by U.S. and international copyright + * and intellectual property laws. Pivotal products are covered by + * one or more patents listed at http://www.pivotal.io/patents. + *========================================================================= + */ +package com.gemstone.gemfire.cache.hdfs.internal; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import com.gemstone.gemfire.DataSerializer; +import com.gemstone.gemfire.cache.Operation; +import com.gemstone.gemfire.internal.ByteArrayDataInput; +import com.gemstone.gemfire.internal.cache.versions.VersionTag; + +/** + * A persisted event that is sorted in an unsorted (sequential hoplog). This + * does not have a version stamp, but just a timestamp for the entry. + * + * This class should only be serialized by calling toData directly, which + * is why it does not implement DataSerializable. + * + * @author dsmith + * + */ +public class UnsortedHoplogPersistedEvent extends PersistedEventImpl { + long timestamp; + + + + public UnsortedHoplogPersistedEvent() { + //for deserialization + } + + public UnsortedHoplogPersistedEvent(Object value, Operation op, + byte valueIsObject, boolean isPossibleDuplicate, long timestamp) throws IOException, + ClassNotFoundException { + super(value, op, valueIsObject, isPossibleDuplicate, false/*hasVersionTag*/); + this.timestamp = timestamp; + } + + @Override + public long getTimstamp() { + return timestamp; + } + + @Override + public void toData(DataOutput out) throws IOException { + super.toData(out); + DataSerializer.writeLong(timestamp, out); + } + + @Override + public void fromData(DataInput in) throws IOException, ClassNotFoundException { + super.fromData(in); + this.timestamp = DataSerializer.readLong(in); + } + + public static UnsortedHoplogPersistedEvent fromBytes(byte[] val) + throws IOException, ClassNotFoundException { + ByteArrayDataInput in = new ByteArrayDataInput(); + in.initialize(val, null); + UnsortedHoplogPersistedEvent event = new UnsortedHoplogPersistedEvent(); + event.fromData(in); + return event; + } + + public void copy(PersistedEventImpl usersValue) { + super.copy(usersValue); + this.timestamp = ((UnsortedHoplogPersistedEvent) usersValue).timestamp; + } + + public static int getSizeInBytes(int keySize, int valueSize, VersionTag versionTag) { + int size = PersistedEventImpl.getSizeInBytes(keySize, valueSize, versionTag); + + // size of Timestamp + size += 8; + + return size; + } +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/1709e627/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/cardinality/Bits.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/cardinality/Bits.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/cardinality/Bits.java new file mode 100644 index 0000000..06ed635 --- /dev/null +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/cardinality/Bits.java @@ -0,0 +1,45 @@ +/*========================================================================= + * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved. + * This product is protected by U.S. and international copyright + * and intellectual property laws. Pivotal products are covered by + * one or more patents listed at http://www.pivotal.io/patents. + *========================================================================= + */ +/* + * Copyright (C) 2011 Clearspring Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.gemstone.gemfire.cache.hdfs.internal.cardinality; + +import java.io.ByteArrayInputStream; +import java.io.DataInputStream; +import java.io.IOException; + +public class Bits +{ + + public static int[] getBits(byte[] mBytes) throws IOException + { + int bitSize = mBytes.length / 4; + int[] bits = new int[bitSize]; + DataInputStream dis = new DataInputStream(new ByteArrayInputStream(mBytes)); + for (int i = 0; i < bitSize; i++) + { + bits[i] = dis.readInt(); + } + return bits; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/1709e627/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/cardinality/CardinalityMergeException.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/cardinality/CardinalityMergeException.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/cardinality/CardinalityMergeException.java new file mode 100644 index 0000000..31d3f16 --- /dev/null +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/cardinality/CardinalityMergeException.java @@ -0,0 +1,33 @@ +/*========================================================================= + * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved. + * This product is protected by U.S. and international copyright + * and intellectual property laws. Pivotal products are covered by + * one or more patents listed at http://www.pivotal.io/patents. + *========================================================================= + */ +/* + * Copyright (C) 2011 Clearspring Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.gemstone.gemfire.cache.hdfs.internal.cardinality; + +@SuppressWarnings("serial") +public abstract class CardinalityMergeException extends Exception +{ + public CardinalityMergeException(String message) + { + super(message); + } +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/1709e627/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/cardinality/HyperLogLog.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/cardinality/HyperLogLog.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/cardinality/HyperLogLog.java new file mode 100644 index 0000000..8c2cd18 --- /dev/null +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/cardinality/HyperLogLog.java @@ -0,0 +1,304 @@ +/*========================================================================= + * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved. + * This product is protected by U.S. and international copyright + * and intellectual property laws. Pivotal products are covered by + * one or more patents listed at http://www.pivotal.io/patents. + *========================================================================= + */ +/* + * Copyright (C) 2012 Clearspring Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.gemstone.gemfire.cache.hdfs.internal.cardinality; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.Serializable; + +/** + * Java implementation of HyperLogLog (HLL) algorithm from this paper: + *

+ * http://algo.inria.fr/flajolet/Publications/FlFuGaMe07.pdf + *

+ * HLL is an improved version of LogLog that is capable of estimating + * the cardinality of a set with accuracy = 1.04/sqrt(m) where + * m = 2^b. So we can control accuracy vs space usage by increasing + * or decreasing b. + *

+ * The main benefit of using HLL over LL is that it only requires 64% + * of the space that LL does to get the same accuracy. + *

+ * This implementation implements a single counter. If a large (millions) + * number of counters are required you may want to refer to: + *

+ * http://dsiutils.dsi.unimi.it/ + *

+ * It has a more complex implementation of HLL that supports multiple counters + * in a single object, drastically reducing the java overhead from creating + * a large number of objects. + *

+ * This implementation leveraged a javascript implementation that Yammer has + * been working on: + *

+ * https://github.com/yammer/probablyjs + *

+ * Note that this implementation does not include the long range correction function + * defined in the original paper. Empirical evidence shows that the correction + * function causes more harm than good. + *

+ * + *

+ * Users have different motivations to use different types of hashing functions. + * Rather than try to keep up with all available hash functions and to remove + * the concern of causing future binary incompatibilities this class allows clients + * to offer the value in hashed int or long form. This way clients are free + * to change their hash function on their own time line. We recommend using Google's + * Guava Murmur3_128 implementation as it provides good performance and speed when + * high precision is required. In our tests the 32bit MurmurHash function included + * in this project is faster and produces better results than the 32 bit murmur3 + * implementation google provides. + *

+ */ +public class HyperLogLog implements ICardinality +{ + private final RegisterSet registerSet; + private final int log2m; + private final double alphaMM; + + + /** + * Create a new HyperLogLog instance using the specified standard deviation. + * + * @param rsd - the relative standard deviation for the counter. + * smaller values create counters that require more space. + */ + public HyperLogLog(double rsd) + { + this(log2m(rsd)); + } + + private static int log2m(double rsd) + { + return (int) (Math.log((1.106 / rsd) * (1.106 / rsd)) / Math.log(2)); + } + + /** + * Create a new HyperLogLog instance. The log2m parameter defines the accuracy of + * the counter. The larger the log2m the better the accuracy. + *

+ * accuracy = 1.04/sqrt(2^log2m) + * + * @param log2m - the number of bits to use as the basis for the HLL instance + */ + public HyperLogLog(int log2m) + { + this(log2m, new RegisterSet((int) Math.pow(2, log2m))); + } + + /** + * Creates a new HyperLogLog instance using the given registers. Used for unmarshalling a serialized + * instance and for merging multiple counters together. + * + * @param registerSet - the initial values for the register set + */ + public HyperLogLog(int log2m, RegisterSet registerSet) + { + this.registerSet = registerSet; + this.log2m = log2m; + int m = (int) Math.pow(2, this.log2m); + + // See the paper. + switch (log2m) + { + case 4: + alphaMM = 0.673 * m * m; + break; + case 5: + alphaMM = 0.697 * m * m; + break; + case 6: + alphaMM = 0.709 * m * m; + break; + default: + alphaMM = (0.7213 / (1 + 1.079 / m)) * m * m; + } + } + + + @Override + public boolean offerHashed(long hashedValue) + { + // j becomes the binary address determined by the first b log2m of x + // j will be between 0 and 2^log2m + final int j = (int) (hashedValue >>> (Long.SIZE - log2m)); + final int r = Long.numberOfLeadingZeros((hashedValue << this.log2m) | (1 << (this.log2m - 1)) + 1) + 1; + return registerSet.updateIfGreater(j, r); + } + + @Override + public boolean offerHashed(int hashedValue) + { + // j becomes the binary address determined by the first b log2m of x + // j will be between 0 and 2^log2m + final int j = hashedValue >>> (Integer.SIZE - log2m); + final int r = Integer.numberOfLeadingZeros((hashedValue << this.log2m) | (1 << (this.log2m - 1)) + 1) + 1; + return registerSet.updateIfGreater(j, r); + } + + @Override + public boolean offer(Object o) + { + final int x = MurmurHash.hash(o); + return offerHashed(x); + } + + + @Override + public long cardinality() + { + double registerSum = 0; + int count = registerSet.count; + double zeros = 0.0; + for (int j = 0; j < registerSet.count; j++) + { + int val = registerSet.get(j); + registerSum += 1.0 / (1<, Serializable + { + private double rsd; + + public Builder(double rsd) + { + this.rsd = rsd; + } + + @Override + public HyperLogLog build() + { + return new HyperLogLog(rsd); + } + + @Override + public int sizeof() + { + int log2m = log2m(rsd); + int k = (int) Math.pow(2, log2m); + return RegisterSet.getBits(k) * 4; + } + + public static HyperLogLog build(byte[] bytes) throws IOException + { + ByteArrayInputStream bais = new ByteArrayInputStream(bytes); + DataInputStream oi = new DataInputStream(bais); + int log2m = oi.readInt(); + int size = oi.readInt(); + byte[] longArrayBytes = new byte[size]; + oi.readFully(longArrayBytes); + return new HyperLogLog(log2m, new RegisterSet((int) Math.pow(2, log2m), Bits.getBits(longArrayBytes))); + } + } + + @SuppressWarnings("serial") + protected static class HyperLogLogMergeException extends CardinalityMergeException + { + public HyperLogLogMergeException(String message) + { + super(message); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/1709e627/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/cardinality/IBuilder.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/cardinality/IBuilder.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/cardinality/IBuilder.java new file mode 100644 index 0000000..935b2bb --- /dev/null +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/cardinality/IBuilder.java @@ -0,0 +1,32 @@ +/*========================================================================= + * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved. + * This product is protected by U.S. and international copyright + * and intellectual property laws. Pivotal products are covered by + * one or more patents listed at http://www.pivotal.io/patents. + *========================================================================= + */ +/* + * Copyright (C) 2011 Clearspring Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.gemstone.gemfire.cache.hdfs.internal.cardinality; + + +public interface IBuilder +{ + T build(); + + int sizeof(); +}