Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 70932200AED for ; Tue, 3 May 2016 23:45:31 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 6EEE81609F5; Tue, 3 May 2016 23:45:31 +0200 (CEST) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 4B6A41609F4 for ; Tue, 3 May 2016 23:45:29 +0200 (CEST) Received: (qmail 75118 invoked by uid 500); 3 May 2016 21:45:28 -0000 Mailing-List: contact commits-help@geode.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@geode.incubator.apache.org Delivered-To: mailing list commits@geode.incubator.apache.org Received: (qmail 75108 invoked by uid 99); 3 May 2016 21:45:28 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 03 May 2016 21:45:28 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id 005F7CA0EE for ; Tue, 3 May 2016 21:45:28 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -0.216 X-Spam-Level: X-Spam-Status: No, score=-0.216 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-0.996, URIBL_BLACK=4] autolearn=disabled Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id JFWsg72o9Eva for ; Tue, 3 May 2016 21:45:18 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with SMTP id 912095FBBE for ; Tue, 3 May 2016 21:45:16 +0000 (UTC) Received: (qmail 71912 invoked by uid 99); 3 May 2016 21:45:15 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 03 May 2016 21:45:15 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id C2454DFBA8; Tue, 3 May 2016 21:45:15 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: agingade@apache.org To: commits@geode.incubator.apache.org Date: Tue, 03 May 2016 21:45:28 -0000 Message-Id: In-Reply-To: <6d06faff2ab34d6f9ca4d51c864d9830@git.apache.org> References: <6d06faff2ab34d6f9ca4d51c864d9830@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [14/60] [abbrv] incubator-geode git commit: GEODE-1072: Removing HDFS related code archived-at: Tue, 03 May 2016 21:45:31 -0000 http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSParallelGatewaySenderQueue.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSParallelGatewaySenderQueue.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSParallelGatewaySenderQueue.java deleted file mode 100644 index 1e6a034..0000000 --- a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSParallelGatewaySenderQueue.java +++ /dev/null @@ -1,471 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.gemstone.gemfire.cache.hdfs.internal; - -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; -import java.util.Set; - - -import com.gemstone.gemfire.cache.CacheException; -import com.gemstone.gemfire.cache.EntryNotFoundException; -import com.gemstone.gemfire.cache.Region; -import com.gemstone.gemfire.i18n.LogWriterI18n; -import com.gemstone.gemfire.internal.SystemTimer; -import com.gemstone.gemfire.internal.SystemTimer.SystemTimerTask; -import com.gemstone.gemfire.internal.cache.ColocationHelper; -import com.gemstone.gemfire.internal.cache.ForceReattemptException; -import com.gemstone.gemfire.internal.cache.LocalRegion; -import com.gemstone.gemfire.internal.cache.PartitionedRegion; -import com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySender; -import com.gemstone.gemfire.internal.cache.wan.GatewaySenderEventImpl; -import com.gemstone.gemfire.internal.cache.wan.parallel.ParallelGatewaySenderQueue; - -/** - * Parallel Gateway Sender Queue extended for HDFS functionality - * - */ -public class HDFSParallelGatewaySenderQueue extends ParallelGatewaySenderQueue { - - private int currentBucketIndex = 0; - private int elementsPeekedAcrossBuckets = 0; - private SystemTimer rollListTimer = null; - public static final String ROLL_SORTED_LIST_TIME_INTERVAL_MS__PROP = "gemfire.ROLL_SORTED_LIST_TIME_INTERVAL_MS"; - private final int ROLL_SORTED_LIST_TIME_INTERVAL_MS = Integer.getInteger(ROLL_SORTED_LIST_TIME_INTERVAL_MS__PROP, 3000); - - public HDFSParallelGatewaySenderQueue(AbstractGatewaySender sender, - Set userPRs, int idx, int nDispatcher) { - - super(sender, userPRs, idx, nDispatcher); - //only first dispatcher Hemant? - if (sender.getBucketSorted() && this.index == 0) { - rollListTimer = new SystemTimer(sender.getCache().getDistributedSystem(), - true); - // schedule the task to roll the skip lists - rollListTimer.scheduleAtFixedRate(new RollSortedListsTimerTask(), - ROLL_SORTED_LIST_TIME_INTERVAL_MS, ROLL_SORTED_LIST_TIME_INTERVAL_MS); - } - } - - @Override - public Object peek() throws InterruptedException, CacheException { - /* If you call peek and use super.peek it leads to the following exception. - * So I'm adding an explicit UnsupportedOperationException. - Caused by: java.lang.ClassCastException: com.gemstone.gemfire.cache.hdfs.internal.HDFSBucketRegionQueue cannot be cast to com.gemstone.gemfire.internal.cache.BucketRegionQueue - at com.gemstone.gemfire.internal.cache.wan.parallel.ParallelGatewaySenderQueue.getRandomPrimaryBucket(ParallelGatewaySenderQueue.java:964) - at com.gemstone.gemfire.internal.cache.wan.parallel.ParallelGatewaySenderQueue.peek(ParallelGatewaySenderQueue.java:1078) - */ - throw new UnsupportedOperationException(); - } - - - @Override - public void cleanUp() { - super.cleanUp(); - cancelRollListTimer(); - } - - private void cancelRollListTimer() { - if (rollListTimer != null) { - rollListTimer.cancel(); - rollListTimer = null; - } - } - /** - * A call to this function peeks elements from the first local primary bucket. - * Next call to this function peeks elements from the next local primary - * bucket and so on. - */ - @Override - public List peek(int batchSize, int timeToWait) throws InterruptedException, - CacheException { - - List batch = new ArrayList(); - - int batchSizeInBytes = batchSize*1024*1024; - PartitionedRegion prQ = getRandomShadowPR(); - if (prQ == null || prQ.getLocalMaxMemory() == 0) { - try { - Thread.sleep(50); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - blockProcesorThreadIfRequired(); - return batch; - } - - ArrayList list = null; - ArrayList pbuckets = new ArrayList(prQ - .getDataStore().getAllLocalPrimaryBucketIds()); - ArrayList buckets = new ArrayList(); - for(Integer i : pbuckets) { - if(i % this.nDispatcher == this.index) - buckets.add(i); - } - // In case of failures, peekedEvents would possibly have some elements - // add them. - if (this.resetLastPeeked) { - int previousBucketId = -1; - boolean stillPrimary = true; - Iterator iter = peekedEvents.iterator(); - // we need to remove the events of the bucket that are no more primary on - // this node as they cannot be persisted from this node. - while(iter.hasNext()) { - HDFSGatewayEventImpl hdfsEvent = (HDFSGatewayEventImpl)iter.next(); - if (previousBucketId != hdfsEvent.getBucketId()){ - stillPrimary = buckets.contains(hdfsEvent.getBucketId()); - previousBucketId = hdfsEvent.getBucketId(); - } - if (stillPrimary) - batch.add(hdfsEvent); - else { - iter.remove(); - } - } - this.resetLastPeeked = false; - } - - if (buckets.size() == 0) { - // Sleep a bit before trying again. provided by Dan - try { - Thread.sleep(50); - } - catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - return batch; - } - - if (this.sender.getBucketSorted()) { - - } - - // Each call to this function returns index of next bucket - // that is to be processed. This function takes care - // of the bucket sequence that is peeked by a sequence of - // peek calls. - // If there are bucket movements between two consecutive - // calls to this function then there is chance that a bucket - // is processed twice while another one is skipped. But, that is - // ok because in the next round, it will be processed. - Integer bIdIndex = getCurrentBucketIndex(buckets.size()); - - // If we have gone through all the buckets once and no - // elements were peeked from any of the buckets, take a nap. - // This always sleep in the first call but that should be ok - // because the timeToWait in practical use cases would be greater - // than this sleep of 100 ms. - if (bIdIndex == 0 && getAndresetElementsPeekedAcrossBuckets() == 0) { - try { - Thread.sleep(100); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - } - - HDFSBucketRegionQueue hrq = ((HDFSBucketRegionQueue)prQ - .getDataStore().getLocalBucketById(buckets.get(bIdIndex))); - - if (hrq == null) { - // bucket moved to another node after getAllLocalPrimaryBucketIds - // was called. Peeking not possible. return. - return batch; - } - long entriesWaitingTobePeeked = hrq.totalEntries(); - - if (entriesWaitingTobePeeked == 0) { - blockProcesorThreadIfRequired(); - return batch; - } - - long currentTimeInMillis = System.currentTimeMillis(); - long bucketSizeInBytes = hrq.getQueueSizeInBytes(); - if (((currentTimeInMillis - hrq.getLastPeekTimeInMillis()) > timeToWait) - || ( bucketSizeInBytes > batchSizeInBytes) - || hrq.shouldDrainImmediately()) { - // peek now - if (logger.isDebugEnabled()) { - logger.debug("Peeking queue " + hrq.getId() + ": bucketSizeInBytes " + bucketSizeInBytes - + ": batchSizeInBytes" + batchSizeInBytes - + ": timeToWait" + timeToWait - + ": (currentTimeInMillis - hrq.getLastPeekTimeInMillis())" + (currentTimeInMillis - hrq.getLastPeekTimeInMillis())); - } - - list = peekAhead(buckets.get(bIdIndex), hrq); - - if (list != null && list.size() != 0 ) { - for (Object object : list) { - batch.add(object); - peekedEvents.add((HDFSGatewayEventImpl)object); - } - } - } - else { - blockProcesorThreadIfRequired(); - } - if (logger.isDebugEnabled() && batch.size() > 0) { - logger.debug(this + ": Peeked a batch of " + batch.size() + " entries"); - } - - setElementsPeekedAcrossBuckets(batch.size()); - - return batch; - } - - /** - * This function maintains an index of the last processed bucket. - * When it is called, it returns index of the next bucket. - * @param totalBuckets - * @return current bucket index - */ - private int getCurrentBucketIndex(int totalBuckets) { - int retBucket = currentBucketIndex; - if (retBucket >= totalBuckets) { - currentBucketIndex = 0; - retBucket = 0; - } - - currentBucketIndex++; - - return retBucket; - } - - @Override - public void remove(int batchSize) throws CacheException { - int destroyed = 0; - HDFSGatewayEventImpl event = null; - - if (this.peekedEvents.size() > 0) - event = (HDFSGatewayEventImpl)this.peekedEvents.remove(); - - while (event != null && destroyed < batchSize) { - Region currentRegion = event.getRegion(); - int currentBucketId = event.getBucketId(); - int bucketId = event.getBucketId(); - - ArrayList listToDestroy = new ArrayList(); - ArrayList destroyedSeqNum = new ArrayList(); - - // create a batch of all the entries of a bucket - while (bucketId == currentBucketId) { - listToDestroy.add(event); - destroyedSeqNum.add(event.getShadowKey()); - destroyed++; - - if (this.peekedEvents.size() == 0 || (destroyed) >= batchSize) { - event = null; - break; - } - - event = (HDFSGatewayEventImpl)this.peekedEvents.remove(); - - bucketId = event.getBucketId(); - - if (!this.sender.isRunning()){ - if (logger.isDebugEnabled()) { - logger.debug("ParallelGatewaySenderQueue#remove: Cache is closing down. Ignoring remove request."); - } - return; - } - } - try { - HDFSBucketRegionQueue brq = getBucketRegionQueue((PartitionedRegion) currentRegion, currentBucketId); - - if (brq != null) { - // destroy the entries from the bucket - brq.destroyKeys(listToDestroy); - // Adding the removed event to the map for BatchRemovalMessage - // We need to provide the prQ as there could be multiple - // queue in a PGS now. - PartitionedRegion prQ = brq.getPartitionedRegion(); - addRemovedEvents(prQ, currentBucketId, destroyedSeqNum); - } - - } catch (ForceReattemptException e) { - if (logger.isDebugEnabled()) { - logger.debug("ParallelGatewaySenderQueue#remove: " + "Got ForceReattemptException for " + this - + " for bucket = " + bucketId); - } - } - catch(EntryNotFoundException e) { - if (logger.isDebugEnabled()) { - logger.debug("ParallelGatewaySenderQueue#remove: " + "Got EntryNotFoundException for " + this - + " for bucket = " + bucketId ); - } - } - } - } - - /** - * Keeps a track of number of elements peeked across all buckets. - */ - private void setElementsPeekedAcrossBuckets(int peekedElements) { - this.elementsPeekedAcrossBuckets +=peekedElements; - } - - /** - * Returns the number of elements peeked across all buckets. Also, - * resets this counter. - */ - private int getAndresetElementsPeekedAcrossBuckets() { - int peekedElements = this.elementsPeekedAcrossBuckets; - this.elementsPeekedAcrossBuckets = 0; - return peekedElements; - } - - @Override - public void remove() throws CacheException { - throw new UnsupportedOperationException("Method HDFSParallelGatewaySenderQueue#remove is not supported"); - } - - @Override - public void put(Object object) throws InterruptedException, CacheException { - super.put(object); - } - - protected ArrayList peekAhead(int bucketId, HDFSBucketRegionQueue hrq) throws CacheException { - - if (logger.isDebugEnabled()) { - logger.debug(this + ": Peekahead for the bucket " + bucketId); - } - ArrayList list = hrq.peekABatch(); - if (logger.isDebugEnabled() && list != null ) { - logger.debug(this + ": Peeked" + list.size() + "objects from bucket " + bucketId); - } - - return list; - } - - @Override - public Object take() { - throw new UnsupportedOperationException("take() is not supported for " + HDFSParallelGatewaySenderQueue.class.toString()); - } - - protected boolean isUsedForHDFS() - { - return true; - } - - @Override - protected void afterRegionAdd (PartitionedRegion userPR) { - } - - /** - * gets the value for region key from the HDFSBucketRegionQueue - * @param region - * @throws ForceReattemptException - */ - public HDFSGatewayEventImpl get(PartitionedRegion region, byte[] regionKey, int bucketId) throws ForceReattemptException { - try { - HDFSBucketRegionQueue brq = getBucketRegionQueue(region, bucketId); - - if (brq ==null) - return null; - - return brq.getObjectForRegionKey(region, regionKey); - } catch(EntryNotFoundException e) { - if (logger.isDebugEnabled()) { - logger.debug("HDFSParallelGatewaySenderQueue#get: " + "Got EntryNotFoundException for " + this - + " for bucket = " + bucketId); - } - } - return null; - } - - @Override - public void clear(PartitionedRegion pr, int bucketId) { - HDFSBucketRegionQueue brq; - try { - brq = getBucketRegionQueue(pr, bucketId); - if (brq == null) - return; - brq.clear(); - } catch (ForceReattemptException e) { - //do nothing, bucket was destroyed. - } - } - - @Override - public int size(PartitionedRegion pr, int bucketId) throws ForceReattemptException { - HDFSBucketRegionQueue hq = getBucketRegionQueue(pr, bucketId); - return hq.size(); - } - - public HDFSBucketRegionQueue getBucketRegionQueue(PartitionedRegion region, - int bucketId) throws ForceReattemptException { - PartitionedRegion leader = ColocationHelper.getLeaderRegion(region); - if (leader == null) - return null; - String leaderregionPath = leader.getFullPath(); - PartitionedRegion prQ = this.userRegionNameToshadowPRMap.get(leaderregionPath); - if (prQ == null) - return null; - HDFSBucketRegionQueue brq; - - brq = ((HDFSBucketRegionQueue)prQ.getDataStore() - .getLocalBucketById(bucketId)); - if(brq == null) { - prQ.getRegionAdvisor().waitForLocalBucketStorage(bucketId); - } - brq = ((HDFSBucketRegionQueue)prQ.getDataStore() - .getInitializedBucketForId(null, bucketId)); - return brq; - } - - /** - * This class has the responsibility of rolling the lists of Sorted event - * Queue. The rolling of lists by a separate thread is required because - * neither put thread nor the peek/remove thread can do that. Put thread - * cannot do it because that would mean doing some synchronization with - * other put threads and peek thread that would hamper the put latency. - * Peek thread cannot do it because if the event insert rate is too high - * the list size can go way beyond what its size. - * - */ - class RollSortedListsTimerTask extends SystemTimerTask { - - - /** - * This function ensures that if any of the buckets has lists that are beyond - * its size, they gets rolled over into new skip lists. - */ - @Override - public void run2() { - Set prQs = getRegions(); - for (PartitionedRegion prQ : prQs) { - ArrayList buckets = new ArrayList(prQ - .getDataStore().getAllLocalPrimaryBucketIds()); - for (Integer bId : buckets) { - HDFSBucketRegionQueue hrq = ((HDFSBucketRegionQueue)prQ - .getDataStore().getLocalBucketById(bId)); - if (hrq == null) { - // bucket moved to another node after getAllLocalPrimaryBucketIds - // was called. continue fixing the next bucket. - continue; - } - if (logger.isDebugEnabled()) { - logger.debug("Rolling over the list for bucket id: " + bId); - } - hrq.rolloverSkipList(); - } - } - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSStoreConfigHolder.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSStoreConfigHolder.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSStoreConfigHolder.java deleted file mode 100644 index 16d3d87..0000000 --- a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSStoreConfigHolder.java +++ /dev/null @@ -1,559 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.gemstone.gemfire.cache.hdfs.internal; - -import java.io.Serializable; - -import org.apache.logging.log4j.Logger; - -import com.gemstone.gemfire.GemFireConfigException; -import com.gemstone.gemfire.cache.hdfs.HDFSStore; -import com.gemstone.gemfire.cache.hdfs.HDFSStoreFactory; -import com.gemstone.gemfire.cache.hdfs.HDFSStoreMutator; -import com.gemstone.gemfire.cache.hdfs.StoreExistsException; -import com.gemstone.gemfire.internal.cache.xmlcache.CacheXml; -import com.gemstone.gemfire.internal.i18n.LocalizedStrings; -import com.gemstone.gemfire.internal.logging.LogService; - - -/** - * Class to hold all hdfs store related configuration. Instead of copying the - * same members in two different classes, factory and store, this class will be - * used. The idea is let HdfsStoreImpl and HdfsStoreCreation delegate get calls, - * set calls and copy constructor calls this class. Moreover this config holder - * can be entirely replaced to support alter config - * - */ -public class HDFSStoreConfigHolder implements HDFSStore, HDFSStoreFactory ,Serializable { - private String name = null; - private String namenodeURL = null; - private String homeDir = DEFAULT_HOME_DIR; - private String clientConfigFile = null; - private float blockCacheSize = DEFAULT_BLOCK_CACHE_SIZE; - private int maxFileSize = DEFAULT_WRITE_ONLY_FILE_SIZE_LIMIT; - private int fileRolloverInterval = DEFAULT_WRITE_ONLY_FILE_ROLLOVER_INTERVAL; - protected boolean isAutoCompact = DEFAULT_MINOR_COMPACTION; - protected boolean autoMajorCompact = DEFAULT_MAJOR_COMPACTION; - protected int maxConcurrency = DEFAULT_MINOR_COMPACTION_THREADS; - protected int majorCompactionConcurrency = DEFAULT_MAJOR_COMPACTION_THREADS; - protected int majorCompactionIntervalMins = DEFAULT_MAJOR_COMPACTION_INTERVAL_MINS; - protected int maxInputFileSizeMB = DEFAULT_INPUT_FILE_SIZE_MAX_MB; - protected int maxInputFileCount = DEFAULT_INPUT_FILE_COUNT_MAX; - protected int minInputFileCount = DEFAULT_INPUT_FILE_COUNT_MIN; - protected int oldFileCleanupIntervalMins = DEFAULT_OLD_FILE_CLEANUP_INTERVAL_MINS; - - protected int batchSize = DEFAULT_BATCH_SIZE_MB; - protected int batchIntervalMillis = DEFAULT_BATCH_INTERVAL_MILLIS; - protected int maximumQueueMemory = DEFAULT_MAX_BUFFER_MEMORY; - protected boolean isPersistenceEnabled = DEFAULT_BUFFER_PERSISTANCE; - protected String diskStoreName = null; - protected boolean diskSynchronous = DEFAULT_DISK_SYNCHRONOUS; - protected int dispatcherThreads = DEFAULT_DISPATCHER_THREADS; - - private static final Logger logger = LogService.getLogger(); - protected final String logPrefix; - - public HDFSStoreConfigHolder() { - this(null); - } - - /** - * @param config configuration source for creating this instance - */ - public HDFSStoreConfigHolder(HDFSStore config) { - this.logPrefix = "<" + getName() + "> "; - if (config == null) { - return; - } - - this.name = config.getName(); - this.namenodeURL = config.getNameNodeURL(); - this.homeDir = config.getHomeDir(); - this.clientConfigFile = config.getHDFSClientConfigFile(); - this.blockCacheSize = config.getBlockCacheSize(); - this.maxFileSize = config.getWriteOnlyFileRolloverSize(); - this.fileRolloverInterval = config.getWriteOnlyFileRolloverInterval(); - isAutoCompact = config.getMinorCompaction(); - maxConcurrency = config.getMinorCompactionThreads(); - autoMajorCompact = config.getMajorCompaction(); - majorCompactionConcurrency = config.getMajorCompactionThreads(); - majorCompactionIntervalMins = config.getMajorCompactionInterval(); - maxInputFileSizeMB = config.getInputFileSizeMax(); - maxInputFileCount = config.getInputFileCountMax(); - minInputFileCount = config.getInputFileCountMin(); - oldFileCleanupIntervalMins = config.getPurgeInterval(); - - batchSize = config.getBatchSize(); - batchIntervalMillis = config.getBatchInterval(); - maximumQueueMemory = config.getMaxMemory(); - isPersistenceEnabled = config.getBufferPersistent(); - diskStoreName = config.getDiskStoreName(); - diskSynchronous = config.getSynchronousDiskWrite(); - dispatcherThreads = config.getDispatcherThreads(); - } - - public void resetDefaultValues() { - name = null; - namenodeURL = null; - homeDir = null; - clientConfigFile = null; - blockCacheSize = -1f; - maxFileSize = -1; - fileRolloverInterval = -1; - - isAutoCompact = false; - maxConcurrency = -1; - maxInputFileSizeMB = -1; - maxInputFileCount = -1; - minInputFileCount = -1; - oldFileCleanupIntervalMins = -1; - - autoMajorCompact = false; - majorCompactionConcurrency = -1; - majorCompactionIntervalMins = -1; - - batchSize = -1; - batchIntervalMillis = -1; - maximumQueueMemory = -1; - isPersistenceEnabled = false; - diskStoreName = null; - diskSynchronous = false; - dispatcherThreads = -1; - } - - public void copyFrom(HDFSStoreMutator mutator) { - if (mutator.getWriteOnlyFileRolloverInterval() >= 0) { - logAttrMutation("fileRolloverInterval", mutator.getWriteOnlyFileRolloverInterval()); - setWriteOnlyFileRolloverInterval(mutator.getWriteOnlyFileRolloverInterval()); - } - if (mutator.getWriteOnlyFileRolloverSize() >= 0) { - logAttrMutation("MaxFileSize", mutator.getWriteOnlyFileRolloverInterval()); - setWriteOnlyFileRolloverSize(mutator.getWriteOnlyFileRolloverSize()); - } - - if (mutator.getMinorCompaction() != null) { - logAttrMutation("MinorCompaction", mutator.getMinorCompaction()); - setMinorCompaction(mutator.getMinorCompaction()); - } - - if (mutator.getMinorCompactionThreads() >= 0) { - logAttrMutation("MaxThreads", mutator.getMinorCompactionThreads()); - setMinorCompactionThreads(mutator.getMinorCompactionThreads()); - } - - if (mutator.getMajorCompactionInterval() > -1) { - logAttrMutation("MajorCompactionIntervalMins", mutator.getMajorCompactionInterval()); - setMajorCompactionInterval(mutator.getMajorCompactionInterval()); - } - if (mutator.getMajorCompactionThreads() >= 0) { - logAttrMutation("MajorCompactionMaxThreads", mutator.getMajorCompactionThreads()); - setMajorCompactionThreads(mutator.getMajorCompactionThreads()); - } - if (mutator.getMajorCompaction() != null) { - logAttrMutation("AutoMajorCompaction", mutator.getMajorCompaction()); - setMajorCompaction(mutator.getMajorCompaction()); - } - if (mutator.getInputFileCountMax() >= 0) { - logAttrMutation("maxInputFileCount", mutator.getInputFileCountMax()); - setInputFileCountMax(mutator.getInputFileCountMax()); - } - if (mutator.getInputFileSizeMax() >= 0) { - logAttrMutation("MaxInputFileSizeMB", mutator.getInputFileSizeMax()); - setInputFileSizeMax(mutator.getInputFileSizeMax()); - } - if (mutator.getInputFileCountMin() >= 0) { - logAttrMutation("MinInputFileCount", mutator.getInputFileCountMin()); - setInputFileCountMin(mutator.getInputFileCountMin()); - } - if (mutator.getPurgeInterval() >= 0) { - logAttrMutation("OldFilesCleanupIntervalMins", mutator.getPurgeInterval()); - setPurgeInterval(mutator.getPurgeInterval()); - } - - if (mutator.getBatchSize() >= 0) { - logAttrMutation("batchSizeMB", mutator.getWriteOnlyFileRolloverInterval()); - setBatchSize(mutator.getBatchSize()); - } - if (mutator.getBatchInterval() >= 0) { - logAttrMutation("batchTimeInterval", mutator.getWriteOnlyFileRolloverInterval()); - setBatchInterval(mutator.getBatchInterval()); - } - } - - void logAttrMutation(String name, Object value) { - if (logger.isDebugEnabled()) { - logger.debug("{}Alter " + name + ":" + value, logPrefix); - } - } - - @Override - public String getName() { - return name; - } - @Override - public HDFSStoreFactory setName(String name) { - this.name = name; - return this; - } - - @Override - public String getNameNodeURL() { - return namenodeURL; - } - @Override - public HDFSStoreFactory setNameNodeURL(String namenodeURL) { - this.namenodeURL = namenodeURL; - return this; - } - - @Override - public String getHomeDir() { - return homeDir; - } - @Override - public HDFSStoreFactory setHomeDir(String homeDir) { - this.homeDir = homeDir; - return this; - } - - @Override - public String getHDFSClientConfigFile() { - return clientConfigFile; - } - @Override - public HDFSStoreFactory setHDFSClientConfigFile(String clientConfigFile) { - this.clientConfigFile = clientConfigFile; - return this; - } - - @Override - public HDFSStoreFactory setBlockCacheSize(float percentage) { - if(percentage < 0 || percentage > 100) { - throw new IllegalArgumentException("Block cache size must be between 0 and 100, inclusive"); - } - this.blockCacheSize = percentage; - return this; - } - - @Override - public float getBlockCacheSize() { - return blockCacheSize; - } - - @Override - public HDFSStoreFactory setWriteOnlyFileRolloverSize(int maxFileSize) { - assertIsPositive(CacheXml.HDFS_WRITE_ONLY_FILE_ROLLOVER_INTERVAL, maxFileSize); - this.maxFileSize = maxFileSize; - return this; - } - @Override - public int getWriteOnlyFileRolloverSize() { - return maxFileSize; - } - - @Override - public HDFSStoreFactory setWriteOnlyFileRolloverInterval(int count) { - assertIsPositive(CacheXml.HDFS_TIME_FOR_FILE_ROLLOVER, count); - this.fileRolloverInterval = count; - return this; - } - @Override - public int getWriteOnlyFileRolloverInterval() { - return fileRolloverInterval; - } - - @Override - public boolean getMinorCompaction() { - return isAutoCompact; - } - @Override - public HDFSStoreFactory setMinorCompaction(boolean auto) { - this.isAutoCompact = auto; - return this; - } - - @Override - public HDFSStoreFactory setMinorCompactionThreads(int count) { - assertIsPositive(CacheXml.HDFS_MINOR_COMPACTION_THREADS, count); - this.maxConcurrency = count; - return this; - } - @Override - public int getMinorCompactionThreads() { - return maxConcurrency; - } - - @Override - public HDFSStoreFactory setMajorCompaction(boolean auto) { - this.autoMajorCompact = auto; - return this; - } - @Override - public boolean getMajorCompaction() { - return autoMajorCompact; - } - - @Override - public HDFSStoreFactory setMajorCompactionInterval(int count) { - HDFSStoreCreation.assertIsPositive(CacheXml.HDFS_MAJOR_COMPACTION_INTERVAL, count); - this.majorCompactionIntervalMins = count; - return this; - } - @Override - public int getMajorCompactionInterval() { - return majorCompactionIntervalMins; - } - - @Override - public HDFSStoreFactory setMajorCompactionThreads(int count) { - HDFSStoreCreation.assertIsPositive(CacheXml.HDFS_MAJOR_COMPACTION_THREADS, count); - this.majorCompactionConcurrency = count; - return this; - } - @Override - public int getMajorCompactionThreads() { - return majorCompactionConcurrency; - } - - @Override - public HDFSStoreFactory setInputFileSizeMax(int size) { - HDFSStoreCreation.assertIsPositive("HDFS_COMPACTION_MAX_INPUT_FILE_SIZE_MB", size); - this.maxInputFileSizeMB = size; - return this; - } - @Override - public int getInputFileSizeMax() { - return maxInputFileSizeMB; - } - - @Override - public HDFSStoreFactory setInputFileCountMin(int count) { - HDFSStoreCreation.assertIsPositive("HDFS_COMPACTION_MIN_INPUT_FILE_COUNT", count); - this.minInputFileCount = count; - return this; - } - @Override - public int getInputFileCountMin() { - return minInputFileCount; - } - - @Override - public HDFSStoreFactory setInputFileCountMax(int count) { - HDFSStoreCreation.assertIsPositive("HDFS_COMPACTION_MAX_INPUT_FILE_COUNT", count); - this.maxInputFileCount = count; - return this; - } - @Override - public int getInputFileCountMax() { - return maxInputFileCount; - } - - @Override - public int getPurgeInterval() { - return oldFileCleanupIntervalMins ; - } - @Override - public HDFSStoreFactory setPurgeInterval(int interval) { - assertIsPositive(CacheXml.HDFS_PURGE_INTERVAL, interval); - this.oldFileCleanupIntervalMins = interval; - return this; - } - - protected void validate() { - if (minInputFileCount > maxInputFileCount) { - throw new IllegalArgumentException( - LocalizedStrings.HOPLOG_MIN_IS_MORE_THAN_MAX - .toLocalizedString(new Object[] { - "HDFS_COMPACTION_MIN_INPUT_FILE_COUNT", - minInputFileCount, - "HDFS_COMPACTION_MAX_INPUT_FILE_COUNT", - maxInputFileCount })); - } - } - - /** - * This method should not be called on this class. - * @see HDFSStoreFactory#create(String) - */ - @Override - public HDFSStore create(String name) throws GemFireConfigException, - StoreExistsException { - throw new UnsupportedOperationException(); - } - - /** - * This method should not be called on this class. - * @see HDFSStoreImpl#destroy() - */ - @Override - public void destroy() { - throw new UnsupportedOperationException(); - } - - public static void assertIsPositive(String name, int count) { - if (count < 1) { - throw new IllegalArgumentException( - LocalizedStrings.DiskWriteAttributesImpl_0_HAS_TO_BE_POSITIVE_NUMBER_AND_THE_VALUE_GIVEN_1_IS_NOT_ACCEPTABLE - .toLocalizedString(new Object[] { name, count })); - } - } - - @Override - public String toString() { - StringBuilder builder = new StringBuilder(); - builder.append("HDFSStoreConfigHolder@"); - builder.append(System.identityHashCode(this)); - builder.append(" ["); - appendStrProp(builder, name, "name"); - appendStrProp(builder, namenodeURL, "namenodeURL"); - appendStrProp(builder, homeDir, "homeDir"); - appendStrProp(builder, clientConfigFile, "clientConfigFile"); - if (blockCacheSize > -1) { - builder.append("blockCacheSize="); - builder.append(blockCacheSize); - builder.append(", "); - } - appendIntProp(builder, maxFileSize, "maxFileSize"); - appendIntProp(builder, fileRolloverInterval, "fileRolloverInterval"); - appendBoolProp(builder, isAutoCompact, "isAutoCompact"); - appendBoolProp(builder, autoMajorCompact, "autoMajorCompact"); - appendIntProp(builder, maxConcurrency, "maxConcurrency"); - appendIntProp(builder, majorCompactionConcurrency, "majorCompactionConcurrency"); - appendIntProp(builder, majorCompactionIntervalMins, "majorCompactionIntervalMins"); - appendIntProp(builder, maxInputFileSizeMB, "maxInputFileSizeMB"); - appendIntProp(builder, maxInputFileCount, "maxInputFileCount"); - appendIntProp(builder, minInputFileCount, "minInputFileCount"); - appendIntProp(builder, oldFileCleanupIntervalMins, "oldFileCleanupIntervalMins"); - appendIntProp(builder, batchSize, "batchSize"); - appendIntProp(builder, batchIntervalMillis, "batchInterval"); - appendIntProp(builder, maximumQueueMemory, "maximumQueueMemory"); - appendIntProp(builder, dispatcherThreads, "dispatcherThreads"); - appendBoolProp(builder, isPersistenceEnabled, "isPersistenceEnabled"); - appendStrProp(builder, diskStoreName, "diskStoreName"); - appendBoolProp(builder, diskSynchronous, "diskSynchronous"); - - builder.append("]"); - return builder.toString(); - } - - private void appendStrProp(StringBuilder builder, String value, String name) { - if (value != null) { - builder.append(name + "="); - builder.append(value); - builder.append(", "); - } - } - - private void appendIntProp(StringBuilder builder, int value, String name) { - if (value > -1) { - builder.append(name + "="); - builder.append(value); - builder.append(", "); - } - } - - private void appendBoolProp(StringBuilder builder, boolean value, String name) { - builder.append(name + "="); - builder.append(value); - builder.append(", "); - } - - @Override - public HDFSStoreMutator createHdfsStoreMutator() { - // as part of alter execution, hdfs store will replace the config holder - // completely. Hence mutator at the config holder is not needed - throw new UnsupportedOperationException(); - } - - @Override - public HDFSStore alter(HDFSStoreMutator mutator) { - // as part of alter execution, hdfs store will replace the config holder - // completely. Hence mutator at the config holder is not needed - throw new UnsupportedOperationException(); - } - - @Override - public String getDiskStoreName() { - return this.diskStoreName; - } - @Override - public HDFSStoreFactory setDiskStoreName(String name) { - this.diskStoreName = name; - return this; - } - - @Override - public int getBatchInterval() { - return this.batchIntervalMillis; - } - @Override - public HDFSStoreFactory setBatchInterval(int intervalMillis){ - this.batchIntervalMillis = intervalMillis; - return this; - } - - @Override - public boolean getBufferPersistent() { - return isPersistenceEnabled; - } - @Override - public HDFSStoreFactory setBufferPersistent(boolean isPersistent) { - this.isPersistenceEnabled = isPersistent; - return this; - } - - @Override - public int getDispatcherThreads() { - return dispatcherThreads; - } - @Override - public HDFSStoreFactory setDispatcherThreads(int dispatcherThreads) { - this.dispatcherThreads = dispatcherThreads; - return this; - } - - @Override - public int getMaxMemory() { - return this.maximumQueueMemory; - } - @Override - public HDFSStoreFactory setMaxMemory(int memory) { - this.maximumQueueMemory = memory; - return this; - } - - @Override - public int getBatchSize() { - return this.batchSize; - } - @Override - public HDFSStoreFactory setBatchSize(int size){ - this.batchSize = size; - return this; - } - - @Override - public boolean getSynchronousDiskWrite() { - return this.diskSynchronous; - } - @Override - public HDFSStoreFactory setSynchronousDiskWrite(boolean isSynchronous) { - this.diskSynchronous = isSynchronous; - return this; - } -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSStoreCreation.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSStoreCreation.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSStoreCreation.java deleted file mode 100644 index 9ecc5e3..0000000 --- a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSStoreCreation.java +++ /dev/null @@ -1,198 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.gemstone.gemfire.cache.hdfs.internal; - -import com.gemstone.gemfire.GemFireConfigException; -import com.gemstone.gemfire.cache.hdfs.HDFSStore; -import com.gemstone.gemfire.cache.hdfs.HDFSStoreFactory; -import com.gemstone.gemfire.cache.hdfs.StoreExistsException; -import com.gemstone.gemfire.internal.i18n.LocalizedStrings; - -/** - */ -public class HDFSStoreCreation implements HDFSStoreFactory { - protected HDFSStoreConfigHolder configHolder; - - public HDFSStoreCreation() { - this(null); - } - - /** - * Copy constructor for HDFSStoreCreation - * @param config configuration source for creating this instance - */ - public HDFSStoreCreation(HDFSStoreCreation config) { - this.configHolder = new HDFSStoreConfigHolder(config == null ? null : config.configHolder); - } - - @Override - public HDFSStoreFactory setName(String name) { - configHolder.setName(name); - return this; - } - - @Override - public HDFSStoreFactory setNameNodeURL(String namenodeURL) { - configHolder.setNameNodeURL(namenodeURL); - return this; - } - - @Override - public HDFSStoreFactory setHomeDir(String homeDir) { - configHolder.setHomeDir(homeDir); - return this; - } - - @Override - public HDFSStoreFactory setHDFSClientConfigFile(String clientConfigFile) { - configHolder.setHDFSClientConfigFile(clientConfigFile); - return this; - } - - @Override - public HDFSStoreFactory setBlockCacheSize(float percentage) { - configHolder.setBlockCacheSize(percentage); - return this; - } - - @Override - public HDFSStoreFactory setWriteOnlyFileRolloverSize(int maxFileSize) { - configHolder.setWriteOnlyFileRolloverSize(maxFileSize); - return this; - } - - @Override - public HDFSStoreFactory setWriteOnlyFileRolloverInterval(int count) { - configHolder.setWriteOnlyFileRolloverInterval(count); - return this; - } - - @Override - public HDFSStoreFactory setMinorCompaction(boolean auto) { - configHolder.setMinorCompaction(auto); - return this; - } - - @Override - public HDFSStoreFactory setMinorCompactionThreads(int count) { - configHolder.setMinorCompactionThreads(count); - return this; - } - - @Override - public HDFSStoreFactory setMajorCompaction(boolean auto) { - configHolder.setMajorCompaction(auto); - return this; - } - - @Override - public HDFSStoreFactory setMajorCompactionInterval(int count) { - configHolder.setMajorCompactionInterval(count); - return this; - } - - @Override - public HDFSStoreFactory setMajorCompactionThreads(int count) { - configHolder.setMajorCompactionThreads(count); - return this; - } - - @Override - public HDFSStoreFactory setInputFileSizeMax(int size) { - configHolder.setInputFileSizeMax(size); - return this; - } - - @Override - public HDFSStoreFactory setInputFileCountMin(int count) { - configHolder.setInputFileCountMin(count); - return this; - } - - @Override - public HDFSStoreFactory setInputFileCountMax(int count) { - configHolder.setInputFileCountMax(count); - return this; - } - - @Override - public HDFSStoreFactory setPurgeInterval(int interval) { - configHolder.setPurgeInterval(interval); - return this; - } - - @Override - public HDFSStoreFactory setDiskStoreName(String name) { - configHolder.setDiskStoreName(name); - return this; - } - - @Override - public HDFSStoreFactory setMaxMemory(int memory) { - configHolder.setMaxMemory(memory); - return this; - } - - @Override - public HDFSStoreFactory setBatchInterval(int intervalMillis) { - configHolder.setBatchInterval(intervalMillis); - return this; - } - - @Override - public HDFSStoreFactory setBatchSize(int size) { - configHolder.setBatchSize(size); - return this; - } - - @Override - public HDFSStoreFactory setBufferPersistent(boolean isPersistent) { - configHolder.setBufferPersistent(isPersistent); - return this; - } - - @Override - public HDFSStoreFactory setSynchronousDiskWrite(boolean isSynchronous) { - configHolder.setSynchronousDiskWrite(isSynchronous); - return this; - } - - @Override - public HDFSStoreFactory setDispatcherThreads(int dispatcherThreads) { - configHolder.setDispatcherThreads(dispatcherThreads); - return this; - } - - /** - * This method should not be called on this class. - * @see HDFSStoreFactory#create(String) - */ - @Override - public HDFSStore create(String name) throws GemFireConfigException, - StoreExistsException { - throw new UnsupportedOperationException(); - } - - public static void assertIsPositive(String name, int count) { - if (count < 1) { - throw new IllegalArgumentException( - LocalizedStrings.DiskWriteAttributesImpl_0_HAS_TO_BE_POSITIVE_NUMBER_AND_THE_VALUE_GIVEN_1_IS_NOT_ACCEPTABLE - .toLocalizedString(new Object[] { name, count })); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSStoreFactoryImpl.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSStoreFactoryImpl.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSStoreFactoryImpl.java deleted file mode 100644 index 749f01c..0000000 --- a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSStoreFactoryImpl.java +++ /dev/null @@ -1,77 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.gemstone.gemfire.cache.hdfs.internal; - -import com.gemstone.gemfire.GemFireConfigException; -import com.gemstone.gemfire.cache.Cache; -import com.gemstone.gemfire.cache.hdfs.HDFSStore; -import com.gemstone.gemfire.cache.hdfs.StoreExistsException; -import com.gemstone.gemfire.internal.cache.GemFireCacheImpl; - - -/** - * Implementation of HDFSStoreFactory - * - */ -public class HDFSStoreFactoryImpl extends HDFSStoreCreation { - public static final String DEFAULT_ASYNC_QUEUE_ID_FOR_HDFS= "HDFS_QUEUE"; - - private Cache cache; - - public HDFSStoreFactoryImpl(Cache cache) { - this(cache, null); - } - - public HDFSStoreFactoryImpl(Cache cache, HDFSStoreCreation config) { - super(config); - this.cache = cache; - } - - @Override - public HDFSStore create(String name) { - if (name == null) { - throw new GemFireConfigException("HDFS store name not provided"); - } - - this.configHolder.validate(); - - HDFSStore result = null; - synchronized (this) { - if (this.cache instanceof GemFireCacheImpl) { - GemFireCacheImpl gfc = (GemFireCacheImpl) this.cache; - if (gfc.findHDFSStore(name) != null) { - throw new StoreExistsException(name); - } - - HDFSStoreImpl hsi = new HDFSStoreImpl(name, this.configHolder); - gfc.addHDFSStore(hsi); - result = hsi; - } - } - return result; - } - - public static final String getEventQueueName(String regionPath) { - return HDFSStoreFactoryImpl.DEFAULT_ASYNC_QUEUE_ID_FOR_HDFS + "_" - + regionPath.replace('/', '_'); - } - - public HDFSStore getConfigView() { - return (HDFSStore) configHolder; - } -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSStoreImpl.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSStoreImpl.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSStoreImpl.java deleted file mode 100644 index b5d56b6..0000000 --- a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSStoreImpl.java +++ /dev/null @@ -1,638 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.gemstone.gemfire.cache.hdfs.internal; - -import java.io.File; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.net.URI; -import java.util.Collection; -import java.util.HashSet; -import java.util.concurrent.Callable; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.LocalFileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.io.hfile.BlockCache; -import org.apache.hadoop.hbase.io.hfile.LruBlockCache; -import org.apache.hadoop.hbase.regionserver.StoreFile; -import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics; -import org.apache.hadoop.hdfs.HdfsConfiguration; -import org.apache.hadoop.ipc.RemoteException; -import org.apache.hadoop.net.ConnectTimeoutException; -import org.apache.hadoop.security.SecurityUtil; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.logging.log4j.Logger; - -import com.gemstone.gemfire.cache.hdfs.HDFSIOException; -import com.gemstone.gemfire.cache.hdfs.HDFSStore; -import com.gemstone.gemfire.cache.hdfs.HDFSStoreMutator; -import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HDFSCompactionManager; -import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HDFSRegionDirector; -import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HDFSStoreDirector; -import com.gemstone.gemfire.cache.hdfs.internal.hoplog.Hoplog.HoplogWriter; -import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HoplogConfig; -import com.gemstone.gemfire.cache.hdfs.internal.hoplog.mapreduce.HoplogUtil; -import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem; -import com.gemstone.gemfire.internal.cache.control.HeapMemoryMonitor; -import com.gemstone.gemfire.internal.cache.persistence.soplog.HFileStoreStatistics; -import com.gemstone.gemfire.internal.i18n.LocalizedStrings; -import com.gemstone.gemfire.internal.logging.LogService; -import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage; -import com.gemstone.gemfire.internal.util.SingletonCallable; -import com.gemstone.gemfire.internal.util.SingletonValue; -import com.gemstone.gemfire.internal.util.SingletonValue.SingletonBuilder; - -/** - * Represents a HDFS based persistent store for region data. - * - */ -public class HDFSStoreImpl implements HDFSStore { - - private volatile HDFSStoreConfigHolder configHolder; - - private final SingletonValue fs; - - /** - * Used to make sure that only one thread creates the writer at a time. This prevents the dispatcher - * threads from cascading the Connection lock in DFS client see bug 51195 - */ - private final SingletonCallable singletonWriter = new SingletonCallable(); - - private final HFileStoreStatistics stats; - private final BlockCache blockCache; - - private static HashSet secureNameNodes = new HashSet(); - - private final boolean PERFORM_SECURE_HDFS_CHECK = Boolean.getBoolean(HoplogConfig.PERFORM_SECURE_HDFS_CHECK_PROP); - private static final Logger logger = LogService.getLogger(); - protected final String logPrefix; - - static { - HdfsConfiguration.init(); - } - - public HDFSStoreImpl(String name, final HDFSStore config) { - this.configHolder = new HDFSStoreConfigHolder(config); - configHolder.setName(name); - - this.logPrefix = "<" + "HdfsStore:" + name + "> "; - - stats = new HFileStoreStatistics(InternalDistributedSystem.getAnyInstance(), "HDFSStoreStatistics", name); - - final Configuration hconf = new Configuration(); - - // Set the block cache size. - // Disable the static block cache. We keep our own cache on the HDFS Store - // hconf.setFloat("hfile.block.cache.size", 0f); - if (this.getBlockCacheSize() != 0) { - long cacheSize = (long) (HeapMemoryMonitor.getTenuredPoolMaxMemory() * this.getBlockCacheSize() / 100); - - // TODO use an off heap block cache if we're using off heap memory? - // See CacheConfig.instantiateBlockCache. - // According to Anthony, the off heap block cache is still - // experimental. Our own off heap cache might be a better bet. -// this.blockCache = new LruBlockCache(cacheSize, -// StoreFile.DEFAULT_BLOCKSIZE_SMALL, hconf, HFileSortedOplogFactory.convertStatistics(stats)); - this.blockCache = new LruBlockCache(cacheSize, StoreFile.DEFAULT_BLOCKSIZE_SMALL, hconf); - } else { - this.blockCache = null; - } - - final String clientFile = config.getHDFSClientConfigFile(); - fs = new SingletonValue(new SingletonBuilder() { - @Override - public FileSystem create() throws IOException { - return createFileSystem(hconf, clientFile, false); - } - - @Override - public void postCreate() { - } - - @Override - public void createInProgress() { - } - }); - - FileSystem fileSystem = null; - try { - fileSystem = fs.get(); - } catch (Throwable ex) { - throw new HDFSIOException(ex.getMessage(),ex); - } - //HDFSCompactionConfig has already been initialized - long cleanUpIntervalMillis = getPurgeInterval() * 60 * 1000; - Path cleanUpIntervalPath = new Path(getHomeDir(), HoplogConfig.CLEAN_UP_INTERVAL_FILE_NAME); - HoplogUtil.exposeCleanupIntervalMillis(fileSystem, cleanUpIntervalPath, cleanUpIntervalMillis); - } - - /** - * Creates a new file system every time. - */ - public FileSystem createFileSystem() { - Configuration hconf = new Configuration(); - try { - return createFileSystem(hconf, this.getHDFSClientConfigFile(), true); - } catch (Throwable ex) { - throw new HDFSIOException(ex.getMessage(),ex); - } - } - - private FileSystem createFileSystem(Configuration hconf, String configFile, boolean forceNew) throws IOException { - FileSystem filesystem = null; - - // load hdfs client config file if specified. The path is on local file - // system - if (configFile != null) { - if (logger.isDebugEnabled()) { - logger.debug("{}Adding resource config file to hdfs configuration:" + configFile, logPrefix); - } - hconf.addResource(new Path(configFile)); - - if (! new File(configFile).exists()) { - logger.warn(LocalizedMessage.create(LocalizedStrings.HOPLOG_HDFS_CLIENT_CONFIG_FILE_ABSENT, configFile)); - } - } - - // This setting disables shutdown hook for file system object. Shutdown - // hook may cause FS object to close before the cache or store and - // unpredictable behavior. This setting is provided for GFXD like server - // use cases where FS close is managed by a server. This setting is not - // supported by old versions of hadoop, HADOOP-4829 - hconf.setBoolean("fs.automatic.close", false); - - // Hadoop has a configuration parameter io.serializations that is a list of serialization - // classes which can be used for obtaining serializers and deserializers. This parameter - // by default contains avro classes. When a sequence file is created, it calls - // SerializationFactory.getSerializer(keyclass). This internally creates objects using - // reflection of all the classes that were part of io.serializations. But since, there is - // no avro class available it throws an exception. - // Before creating a sequenceFile, override the io.serializations parameter and pass only the classes - // that are important to us. - hconf.setStrings("io.serializations", - new String[]{"org.apache.hadoop.io.serializer.WritableSerialization"}); - // create writer - - SchemaMetrics.configureGlobally(hconf); - - String nameNodeURL = null; - if ((nameNodeURL = getNameNodeURL()) == null) { - nameNodeURL = hconf.get("fs.default.name"); - } - - URI namenodeURI = URI.create(nameNodeURL); - - //if (! GemFireCacheImpl.getExisting().isHadoopGfxdLonerMode()) { - String authType = hconf.get("hadoop.security.authentication"); - - //The following code handles Gemfire XD with secure HDFS - //A static set is used to cache all known secure HDFS NameNode urls. - UserGroupInformation.setConfiguration(hconf); - - //Compare authentication method ignoring case to make GFXD future version complaint - //At least version 2.0.2 starts complaining if the string "kerberos" is not in all small case. - //However it seems current version of hadoop accept the authType in any case - if (authType.equalsIgnoreCase("kerberos")) { - - String principal = hconf.get(HoplogConfig.KERBEROS_PRINCIPAL); - String keyTab = hconf.get(HoplogConfig.KERBEROS_KEYTAB_FILE); - - if (!PERFORM_SECURE_HDFS_CHECK) { - if (logger.isDebugEnabled()) - logger.debug("{}Ignore secure hdfs check", logPrefix); - } else { - if (!secureNameNodes.contains(nameNodeURL)) { - if (logger.isDebugEnabled()) - logger.debug("{}Executing secure hdfs check", logPrefix); - try{ - filesystem = FileSystem.newInstance(namenodeURI, hconf); - //Make sure no IOExceptions are generated when accessing insecure HDFS. - filesystem.listFiles(new Path("/"),false); - throw new HDFSIOException("Gemfire XD HDFS client and HDFS cluster security levels do not match. The configured HDFS Namenode is not secured."); - } catch (IOException ex) { - secureNameNodes.add(nameNodeURL); - } finally { - //Close filesystem to avoid resource leak - if(filesystem != null) { - closeFileSystemIgnoreError(filesystem); - } - } - } - } - - // check to ensure the namenode principal is defined - String nameNodePrincipal = hconf.get("dfs.namenode.kerberos.principal"); - if (nameNodePrincipal == null) { - throw new IOException(LocalizedStrings.GF_KERBEROS_NAMENODE_PRINCIPAL_UNDEF.toLocalizedString()); - } - - // ok, the user specified a gfxd principal so we will try to login - if (principal != null) { - //If NameNode principal is the same as Gemfire XD principal, there is a - //potential security hole - String regex = "[/@]"; - if (nameNodePrincipal != null) { - String HDFSUser = nameNodePrincipal.split(regex)[0]; - String GFXDUser = principal.split(regex)[0]; - if (HDFSUser.equals(GFXDUser)) { - logger.warn(LocalizedMessage.create(LocalizedStrings.HDFS_USER_IS_SAME_AS_GF_USER, GFXDUser)); - } - } - - // a keytab must exist if the user specifies a principal - if (keyTab == null) { - throw new IOException(LocalizedStrings.GF_KERBEROS_KEYTAB_UNDEF.toLocalizedString()); - } - - // the keytab must exist as well - File f = new File(keyTab); - if (!f.exists()) { - throw new FileNotFoundException(LocalizedStrings.GF_KERBEROS_KEYTAB_FILE_ABSENT.toLocalizedString(f.getAbsolutePath())); - } - - //Authenticate Gemfire XD principal to Kerberos KDC using Gemfire XD keytab file - String principalWithValidHost = SecurityUtil.getServerPrincipal(principal, ""); - UserGroupInformation.loginUserFromKeytab(principalWithValidHost, keyTab); - } else { - logger.warn(LocalizedMessage.create(LocalizedStrings.GF_KERBEROS_PRINCIPAL_UNDEF)); - } - } - //} - - filesystem = getFileSystemFactory().create(namenodeURI, hconf, forceNew); - - if (logger.isDebugEnabled()) { - logger.debug("{}Initialized FileSystem linked to " + filesystem.getUri() - + " " + filesystem.hashCode(), logPrefix); - } - return filesystem; - } - - public FileSystem getFileSystem() throws IOException { - return fs.get(); - } - - public FileSystem getCachedFileSystem() { - return fs.getCachedValue(); - } - - public SingletonCallable getSingletonWriter() { - return this.singletonWriter; - } - - private final SingletonCallable fsExists = new SingletonCallable(); - - public boolean checkFileSystemExists() throws IOException { - try { - return fsExists.runSerially(new Callable() { - @Override - public Boolean call() throws Exception { - FileSystem fileSystem = getCachedFileSystem(); - if (fileSystem == null) { - return false; - } - return fileSystem.exists(new Path("/")); - } - }); - } catch (Exception e) { - if (e instanceof IOException) { - throw (IOException)e; - } - throw new IOException(e); - } - } - - /** - * This method executes a query on namenode. If the query succeeds, FS - * instance is healthy. If it fails, the old instance is closed and a new - * instance is created. - */ - public void checkAndClearFileSystem() { - FileSystem fileSystem = getCachedFileSystem(); - - if (fileSystem != null) { - if (logger.isDebugEnabled()) { - logger.debug("{}Checking file system at " + fileSystem.getUri(), logPrefix); - } - try { - checkFileSystemExists(); - if (logger.isDebugEnabled()) { - logger.debug("{}FS client is ok: " + fileSystem.getUri() + " " - + fileSystem.hashCode(), logPrefix); - } - return; - } catch (ConnectTimeoutException e) { - if (logger.isDebugEnabled()) { - logger.debug("{}Hdfs unreachable, FS client is ok: " - + fileSystem.getUri() + " " + fileSystem.hashCode(), logPrefix); - } - return; - } catch (IOException e) { - logger.debug("IOError in filesystem checkAndClear ", e); - - // The file system is closed or NN is not reachable. It is safest to - // create a new FS instance. If the NN continues to remain unavailable, - // all subsequent read/write request will cause HDFSIOException. This is - // similar to the way hbase manages failures. This has a drawback - // though. A network blip will result in all connections to be - // recreated. However trying to preserve the connections and waiting for - // FS to auto-recover is not deterministic. - if (e instanceof RemoteException) { - e = ((RemoteException) e).unwrapRemoteException(); - } - - logger.warn(LocalizedMessage.create(LocalizedStrings.HOPLOG_HDFS_UNREACHABLE, - fileSystem.getUri()), e); - } - - // compare and clear FS container. The fs container needs to be reusable - boolean result = fs.clear(fileSystem, true); - if (!result) { - // the FS instance changed after this call was initiated. Check again - logger.debug("{}Failed to clear FS ! I am inconsistent so retrying ..", logPrefix); - checkAndClearFileSystem(); - } else { - closeFileSystemIgnoreError(fileSystem); - } - } - } - - private void closeFileSystemIgnoreError(FileSystem fileSystem) { - if (fileSystem == null) { - logger.debug("{}Trying to close null file system", logPrefix); - return; - } - - try { - if (logger.isDebugEnabled()) { - logger.debug("{}Closing file system at " + fileSystem.getUri() + " " - + fileSystem.hashCode(), logPrefix); - } - fileSystem.close(); - } catch (Exception e) { - if (logger.isDebugEnabled()) { - logger.debug("Failed to close file system at " + fileSystem.getUri() - + " " + fileSystem.hashCode(), e); - } - } - } - - public HFileStoreStatistics getStats() { - return stats; - } - - public BlockCache getBlockCache() { - return blockCache; - } - - public void close() { - logger.debug("{}Closing file system: " + getName(), logPrefix); - stats.close(); - blockCache.shutdown(); - //Might want to clear the block cache, but it should be dereferenced. - - // release DDL hoplog organizer for this store. Also shutdown compaction - // threads. These two resources hold references to GemfireCacheImpl - // instance. Any error is releasing this resources is not critical and needs - // be ignored. - try { - HDFSCompactionManager manager = HDFSCompactionManager.getInstance(this); - if (manager != null) { - manager.reset(); - } - } catch (Exception e) { - logger.info(e); - } - - // once this store is closed, this store should not be used again - FileSystem fileSystem = fs.clear(false); - if (fileSystem != null) { - closeFileSystemIgnoreError(fileSystem); - } - } - - /** - * Test hook to remove all of the contents of the the folder - * for this HDFS store from HDFS. - * @throws IOException - */ - public void clearFolder() throws IOException { - getFileSystem().delete(new Path(getHomeDir()), true); - } - - @Override - public void destroy() { - Collection regions = HDFSRegionDirector.getInstance().getRegionsInStore(this); - if(!regions.isEmpty()) { - throw new IllegalStateException("Cannot destroy a HDFS store that still contains regions: " + regions); - } - close(); - HDFSStoreDirector.getInstance().removeHDFSStore(this.getName()); - } - - @Override - public synchronized HDFSStore alter(HDFSStoreMutator mutator) { - if (logger.isDebugEnabled()) { - logger.debug("{}Altering hdfsStore " + this, logPrefix); - logger.debug("{}Mutator " + mutator, logPrefix); - } - HDFSStoreConfigHolder newHolder = new HDFSStoreConfigHolder(configHolder); - newHolder.copyFrom(mutator); - newHolder.validate(); - HDFSStore oldStore = configHolder; - configHolder = newHolder; - if (logger.isDebugEnabled()) { - logger.debug("{}Resuult of Alter " + this, logPrefix); - } - return (HDFSStore) oldStore; - } - - @Override - public String toString() { - StringBuilder builder = new StringBuilder(); - builder.append("HDFSStoreImpl ["); - if (configHolder != null) { - builder.append("configHolder="); - builder.append(configHolder); - } - builder.append("]"); - return builder.toString(); - } - - @Override - public String getName() { - return configHolder.getName(); - } - - @Override - public String getNameNodeURL() { - return configHolder.getNameNodeURL(); - } - - @Override - public String getHomeDir() { - return configHolder.getHomeDir(); - } - - @Override - public String getHDFSClientConfigFile() { - return configHolder.getHDFSClientConfigFile(); - } - - @Override - public float getBlockCacheSize() { - return configHolder.getBlockCacheSize(); - } - - @Override - public int getWriteOnlyFileRolloverSize() { - return configHolder.getWriteOnlyFileRolloverSize(); - } - - @Override - public int getWriteOnlyFileRolloverInterval() { - return configHolder.getWriteOnlyFileRolloverInterval(); - } - - @Override - public boolean getMinorCompaction() { - return configHolder.getMinorCompaction(); - } - - @Override - public int getMinorCompactionThreads() { - return configHolder.getMinorCompactionThreads(); - } - - @Override - public boolean getMajorCompaction() { - return configHolder.getMajorCompaction(); - } - - @Override - public int getMajorCompactionInterval() { - return configHolder.getMajorCompactionInterval(); - } - - @Override - public int getMajorCompactionThreads() { - return configHolder.getMajorCompactionThreads(); - } - - - @Override - public int getInputFileSizeMax() { - return configHolder.getInputFileSizeMax(); - } - - @Override - public int getInputFileCountMin() { - return configHolder.getInputFileCountMin(); - } - - @Override - public int getInputFileCountMax() { - return configHolder.getInputFileCountMax(); - } - - @Override - public int getPurgeInterval() { - return configHolder.getPurgeInterval(); - } - - @Override - public String getDiskStoreName() { - return configHolder.getDiskStoreName(); - } - - @Override - public int getMaxMemory() { - return configHolder.getMaxMemory(); - } - - @Override - public int getBatchSize() { - return configHolder.getBatchSize(); - } - - @Override - public int getBatchInterval() { - return configHolder.getBatchInterval(); - } - - @Override - public boolean getBufferPersistent() { - return configHolder.getBufferPersistent(); - } - - @Override - public boolean getSynchronousDiskWrite() { - return configHolder.getSynchronousDiskWrite(); - } - - @Override - public int getDispatcherThreads() { - return configHolder.getDispatcherThreads(); - } - - @Override - public HDFSStoreMutator createHdfsStoreMutator() { - return new HDFSStoreMutatorImpl(); - } - - public FileSystemFactory getFileSystemFactory() { - return new DistributedFileSystemFactory(); - } - - /* - * Factory to create HDFS file system instances - */ - static public interface FileSystemFactory { - public FileSystem create(URI namenode, Configuration conf, boolean forceNew) throws IOException; - } - - /* - * File system factory implementations for creating instances of file system - * connected to distributed HDFS cluster - */ - public class DistributedFileSystemFactory implements FileSystemFactory { - private final boolean ALLOW_TEST_FILE_SYSTEM = Boolean.getBoolean(HoplogConfig.ALLOW_LOCAL_HDFS_PROP); - private final boolean USE_FS_CACHE = Boolean.getBoolean(HoplogConfig.USE_FS_CACHE); - - @Override - public FileSystem create(URI nn, Configuration conf, boolean create) throws IOException { - FileSystem filesystem; - - if (USE_FS_CACHE && !create) { - filesystem = FileSystem.get(nn, conf); - } else { - filesystem = FileSystem.newInstance(nn, conf); - } - - if (filesystem instanceof LocalFileSystem && !ALLOW_TEST_FILE_SYSTEM) { - closeFileSystemIgnoreError(filesystem); - throw new IllegalStateException( - LocalizedStrings.HOPLOG_TRYING_TO_CREATE_STANDALONE_SYSTEM.toLocalizedString(getNameNodeURL())); - } - - return filesystem; - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSStoreMutatorImpl.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSStoreMutatorImpl.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSStoreMutatorImpl.java deleted file mode 100644 index 203e623..0000000 --- a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSStoreMutatorImpl.java +++ /dev/null @@ -1,200 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.gemstone.gemfire.cache.hdfs.internal; - -import com.gemstone.gemfire.cache.hdfs.HDFSStore; -import com.gemstone.gemfire.cache.hdfs.HDFSStoreMutator; -import com.gemstone.gemfire.internal.i18n.LocalizedStrings; - -public class HDFSStoreMutatorImpl implements HDFSStoreMutator { - private HDFSStoreConfigHolder configHolder; - private Boolean autoCompact; - private Boolean autoMajorCompact; - - public HDFSStoreMutatorImpl() { - configHolder = new HDFSStoreConfigHolder(); - configHolder.resetDefaultValues(); - } - - public HDFSStoreMutatorImpl(HDFSStore store) { - configHolder = new HDFSStoreConfigHolder(store); - } - - public HDFSStoreMutator setWriteOnlyFileRolloverSize(int maxFileSize) { - configHolder.setWriteOnlyFileRolloverSize(maxFileSize); - return this; - } - @Override - public int getWriteOnlyFileRolloverSize() { - return configHolder.getWriteOnlyFileRolloverSize(); - } - - @Override - public HDFSStoreMutator setWriteOnlyFileRolloverInterval(int count) { - configHolder.setWriteOnlyFileRolloverInterval(count); - return this; - } - @Override - public int getWriteOnlyFileRolloverInterval() { - return configHolder.getWriteOnlyFileRolloverInterval(); - } - - @Override - public HDFSStoreMutator setMinorCompaction(boolean auto) { - autoCompact = Boolean.valueOf(auto); - configHolder.setMinorCompaction(auto); - return null; - } - @Override - public Boolean getMinorCompaction() { - return autoCompact; - } - - @Override - public HDFSStoreMutator setMinorCompactionThreads(int count) { - configHolder.setMinorCompactionThreads(count); - return this; - } - @Override - public int getMinorCompactionThreads() { - return configHolder.getMinorCompactionThreads(); - } - - @Override - public HDFSStoreMutator setMajorCompaction(boolean auto) { - autoMajorCompact = Boolean.valueOf(auto); - configHolder.setMajorCompaction(auto); - return this; - } - @Override - public Boolean getMajorCompaction() { - return autoMajorCompact; - } - - @Override - public HDFSStoreMutator setMajorCompactionInterval(int count) { - configHolder.setMajorCompactionInterval(count); - return this; - } - @Override - public int getMajorCompactionInterval() { - return configHolder.getMajorCompactionInterval(); - } - - @Override - public HDFSStoreMutator setMajorCompactionThreads(int count) { - configHolder.setMajorCompactionThreads(count); - return this; - } - @Override - public int getMajorCompactionThreads() { - return configHolder.getMajorCompactionThreads(); - } - - @Override - public HDFSStoreMutator setInputFileSizeMax(int size) { - configHolder.setInputFileSizeMax(size); - return this; - } - @Override - public int getInputFileSizeMax() { - return configHolder.getInputFileSizeMax(); - } - - @Override - public HDFSStoreMutator setInputFileCountMin(int count) { - configHolder.setInputFileCountMin(count); - return this; - } - @Override - public int getInputFileCountMin() { - return configHolder.getInputFileCountMin(); - } - - @Override - public HDFSStoreMutator setInputFileCountMax(int count) { - configHolder.setInputFileCountMax(count); - return this; - } - @Override - public int getInputFileCountMax() { - return configHolder.getInputFileCountMax(); - } - - @Override - public HDFSStoreMutator setPurgeInterval(int interval) { - configHolder.setPurgeInterval(interval); - return this; - } - @Override - public int getPurgeInterval() { - return configHolder.getPurgeInterval(); - } - - @Override - public int getBatchSize() { - return configHolder.batchSize; - } - @Override - public HDFSStoreMutator setBatchSize(int size) { - configHolder.setBatchSize(size); - return this; - } - - - @Override - public int getBatchInterval() { - return configHolder.batchIntervalMillis; - } - @Override - public HDFSStoreMutator setBatchInterval(int interval) { - configHolder.setBatchInterval(interval); - return this; - } - - public static void assertIsPositive(String name, int count) { - if (count < 1) { - throw new IllegalArgumentException( - LocalizedStrings.DiskWriteAttributesImpl_0_HAS_TO_BE_POSITIVE_NUMBER_AND_THE_VALUE_GIVEN_1_IS_NOT_ACCEPTABLE - .toLocalizedString(new Object[] { name, count })); - } - } - - @Override - public String toString() { - StringBuilder builder = new StringBuilder(); - builder.append("HDFSStoreMutatorImpl ["); - if (configHolder != null) { - builder.append("configHolder="); - builder.append(configHolder); - builder.append(", "); - } - if (autoCompact != null) { - builder.append("MinorCompaction="); - builder.append(autoCompact); - builder.append(", "); - } - if (getMajorCompaction() != null) { - builder.append("autoMajorCompaction="); - builder.append(getMajorCompaction()); - builder.append(", "); - } - builder.append("]"); - return builder.toString(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSWriteOnlyStoreEventListener.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSWriteOnlyStoreEventListener.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSWriteOnlyStoreEventListener.java deleted file mode 100644 index 0298523..0000000 --- a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSWriteOnlyStoreEventListener.java +++ /dev/null @@ -1,184 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.gemstone.gemfire.cache.hdfs.internal; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Map; - -import com.gemstone.gemfire.cache.CacheClosedException; -import com.gemstone.gemfire.cache.asyncqueue.AsyncEvent; -import com.gemstone.gemfire.cache.asyncqueue.AsyncEventListener; -import com.gemstone.gemfire.cache.hdfs.internal.hoplog.Hoplog; -import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HoplogOrganizer; -import com.gemstone.gemfire.i18n.LogWriterI18n; -import com.gemstone.gemfire.internal.cache.BucketRegion; -import com.gemstone.gemfire.internal.cache.ForceReattemptException; -import com.gemstone.gemfire.internal.cache.LocalRegion; -import com.gemstone.gemfire.internal.cache.PartitionedRegion; -import com.gemstone.gemfire.internal.cache.PrimaryBucketException; -import com.gemstone.gemfire.internal.cache.execute.BucketMovedException; -import com.gemstone.gemfire.internal.i18n.LocalizedStrings; - -/** - * Listener that persists events to a write only HDFS store - * - */ -public class HDFSWriteOnlyStoreEventListener implements - AsyncEventListener { - - private final LogWriterI18n logger; - private volatile boolean senderStopped = false; - private final FailureTracker failureTracker = new FailureTracker(10L, 60 * 1000L, 1.5f); - - - public HDFSWriteOnlyStoreEventListener(LogWriterI18n logger) { - this.logger = logger; - } - - @Override - public void close() { - senderStopped = true; - } - - @Override - public boolean processEvents(List events) { - if (Hoplog.NOP_WRITE) { - return true; - } - - if (logger.fineEnabled()) - logger.fine("HDFSWriteOnlyStoreEventListener: A total of " + events.size() + " events are sent from GemFire to persist on HDFS"); - boolean success = false; - try { - failureTracker.sleepIfRetry(); - HDFSGatewayEventImpl hdfsEvent = null; - int previousBucketId = -1; - BatchManager bm = null; - for (AsyncEvent asyncEvent : events) { - if (senderStopped){ - if (logger.fineEnabled()) { - logger.fine("HDFSWriteOnlyStoreEventListener.processEvents: Cache is closing down. Ignoring the batch of data."); - } - return false; - } - hdfsEvent = (HDFSGatewayEventImpl)asyncEvent; - if (previousBucketId != hdfsEvent.getBucketId()){ - if (previousBucketId != -1) - persistBatch(bm, previousBucketId); - - previousBucketId = hdfsEvent.getBucketId(); - bm = new BatchManager(); - } - bm.addEvent(hdfsEvent); - } - try { - persistBatch(bm, hdfsEvent.getBucketId()); - } catch (BucketMovedException e) { - logger.fine("Batch could not be written to HDFS as the bucket moved. bucket id: " + - hdfsEvent.getBucketId() + " Exception: " + e); - return false; - } - success = true; - } catch (IOException e) { - logger.warning(LocalizedStrings.HOPLOG_FLUSH_FOR_BATCH_FAILED, e); - return false; - } - catch (ClassNotFoundException e) { - logger.warning(LocalizedStrings.HOPLOG_FLUSH_FOR_BATCH_FAILED, e); - return false; - } - catch (CacheClosedException e) { - // exit silently - if (logger.fineEnabled()) - logger.fine(e); - return false; - } catch (ForceReattemptException e) { - if (logger.fineEnabled()) - logger.fine(e); - return false; - } catch (InterruptedException e1) { - // TODO Auto-generated catch block - e1.printStackTrace(); - } finally { - failureTracker.record(success); - } - return true; - } - - /** - * Persists batches of multiple regions specified by the batch manager - * - */ - private void persistBatch(BatchManager bm, int bucketId) throws IOException, ForceReattemptException { - Iterator>> eventsPerRegion = - bm.iterator(); - HoplogOrganizer bucketOrganizer = null; - while (eventsPerRegion.hasNext()) { - Map.Entry> eventsForARegion = eventsPerRegion.next(); - bucketOrganizer = getOrganizer((PartitionedRegion) eventsForARegion.getKey(), bucketId); - // bucket organizer cannot be null. - if (bucketOrganizer == null) - throw new BucketMovedException("Bucket moved. BucketID: " + bucketId + " HdfsRegion: " + eventsForARegion.getKey().getName()); - bucketOrganizer.flush(eventsForARegion.getValue().iterator(), eventsForARegion.getValue().size()); - if (logger.fineEnabled()) { - logger.fine("Batch written to HDFS of size " + eventsForARegion.getValue().size() + - " for region " + eventsForARegion.getKey()); - } - } - } - - private HoplogOrganizer getOrganizer(PartitionedRegion region, int bucketId) { - BucketRegion br = region.getDataStore().getLocalBucketById(bucketId); - if (br == null) { - // got rebalanced or something - throw new BucketMovedException("Bucket region is no longer available. BucketId: "+ - bucketId + " HdfsRegion: " + region.getName()); - } - - return br.getHoplogOrganizer(); - } - - /** - * Sorts out events of the multiple regions into lists per region - * - */ - private class BatchManager implements Iterable>> { - private HashMap> regionBatches = - new HashMap>(); - - public void addEvent (HDFSGatewayEventImpl hdfsEvent) throws IOException, ClassNotFoundException { - LocalRegion region = (LocalRegion) hdfsEvent.getRegion(); - ArrayList regionList = regionBatches.get(region); - if (regionList == null) { - regionList = new ArrayList(); - regionBatches.put(region, regionList); - } - regionList.add(new UnsortedHDFSQueuePersistedEvent(hdfsEvent)); - } - - @Override - public Iterator>> iterator() { - return regionBatches.entrySet().iterator(); - } - - } -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HoplogListenerForRegion.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HoplogListenerForRegion.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HoplogListenerForRegion.java deleted file mode 100644 index c7ba23f..0000000 --- a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HoplogListenerForRegion.java +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.gemstone.gemfire.cache.hdfs.internal; - -import java.io.IOException; -import java.util.List; -import java.util.concurrent.CopyOnWriteArrayList; - -import com.gemstone.gemfire.cache.hdfs.HDFSIOException; -import com.gemstone.gemfire.cache.hdfs.internal.hoplog.Hoplog; -import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HoplogListener; - -/** - * Objects of this class needs to be created for every region. These objects - * listen to the oplog events and take appropriate action. - * - */ -public class HoplogListenerForRegion implements HoplogListener { - - private List otherListeners = new CopyOnWriteArrayList(); - - public HoplogListenerForRegion() { - - } - - @Override - public void hoplogCreated(String regionFolder, int bucketId, - Hoplog... oplogs) throws IOException { - for (HoplogListener listener : this.otherListeners) { - listener.hoplogCreated(regionFolder, bucketId, oplogs); - } - } - - @Override - public void hoplogDeleted(String regionFolder, int bucketId, - Hoplog... oplogs) { - for (HoplogListener listener : this.otherListeners) { - try { - listener.hoplogDeleted(regionFolder, bucketId, oplogs); - } catch (IOException e) { - // TODO handle - throw new HDFSIOException(e.getLocalizedMessage(), e); - } - } - } - - public void addListener(HoplogListener listener) { - this.otherListeners.add(listener); - } - - @Override - public void compactionCompleted(String region, int bucket, boolean isMajor) { - for (HoplogListener listener : this.otherListeners) { - listener.compactionCompleted(region, bucket, isMajor); - } - } -}