Return-Path: X-Original-To: apmail-geode-commits-archive@minotaur.apache.org Delivered-To: apmail-geode-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 589A418FB8 for ; Thu, 28 Apr 2016 23:10:07 +0000 (UTC) Received: (qmail 13265 invoked by uid 500); 28 Apr 2016 23:10:07 -0000 Delivered-To: apmail-geode-commits-archive@geode.apache.org Received: (qmail 13230 invoked by uid 500); 28 Apr 2016 23:10:07 -0000 Mailing-List: contact commits-help@geode.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@geode.incubator.apache.org Delivered-To: mailing list commits@geode.incubator.apache.org Received: (qmail 13221 invoked by uid 99); 28 Apr 2016 23:10:07 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 28 Apr 2016 23:10:07 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id 8833A1806C4 for ; Thu, 28 Apr 2016 23:10:06 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -3.221 X-Spam-Level: X-Spam-Status: No, score=-3.221 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-0.001] autolearn=disabled Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id eso7VFFQsC2L for ; Thu, 28 Apr 2016 23:09:46 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with SMTP id EC57D5FCF0 for ; Thu, 28 Apr 2016 23:09:42 +0000 (UTC) Received: (qmail 9097 invoked by uid 99); 28 Apr 2016 23:09:41 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 28 Apr 2016 23:09:40 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id E04CCE00C7; Thu, 28 Apr 2016 23:09:40 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: klund@apache.org To: commits@geode.incubator.apache.org Date: Thu, 28 Apr 2016 23:10:08 -0000 Message-Id: <4426aa3170a84a45a0747cdb0b8ce4b1@git.apache.org> In-Reply-To: <82e5948c82d540f8bec515c234fa1706@git.apache.org> References: <82e5948c82d540f8bec515c234fa1706@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [29/50] [abbrv] incubator-geode git commit: GEODE-1072: Removing HDFS related code http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HoplogOrganizer.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HoplogOrganizer.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HoplogOrganizer.java deleted file mode 100644 index f7d746d..0000000 --- a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HoplogOrganizer.java +++ /dev/null @@ -1,123 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.gemstone.gemfire.cache.hdfs.internal.hoplog; - -import java.io.Closeable; -import java.io.IOException; -import java.util.Iterator; -import java.util.concurrent.Future; - -import com.gemstone.gemfire.cache.hdfs.HDFSStore; -import com.gemstone.gemfire.cache.hdfs.internal.PersistedEventImpl; -import com.gemstone.gemfire.cache.hdfs.internal.QueuedPersistentEvent; -import com.gemstone.gemfire.internal.cache.ForceReattemptException; - -/** - * Manages bucket level operations on sorted oplog files including creation, reading, serde, bloom - * buffering and compaction. Abstracts existence of multiple sorted oplog files - */ -public interface HoplogOrganizer extends HoplogSetReader, - HoplogListener, Closeable { - - /** - * Iterates on the input buffer and persists it in a new sorted oplog. This invocation may block - * if there are too many outstanding write requests. - * - * @param bufferIter - * ordered iterator on a buffer of objects to be persisted - * @param count - * number of K,V pairs expected to be part of flush, 0 if unknown - * @throws IOException - */ - public void flush(Iterator bufferIter, int count) - throws IOException, ForceReattemptException; - - - /** - * Clear the data in HDFS. This method assumes that the - * dispatcher thread has already been paused, so there should be - * no concurrent flushes to HDFS when this method is called. - * - * @throws IOException - */ - public void clear() throws IOException; - - /** - * returns the compactor associated with this set - */ - public Compactor getCompactor(); - - /** - * Called to execute bucket maintenance activities, like purge expired files - * and create compaction task. Long running activities must be executed - * asynchronously, not on this thread, to avoid impact on other buckets - * @throws IOException - */ - public void performMaintenance() throws IOException; - - /** - * Schedules a compaction task and returns immediately. - * - * @param isMajor true for major compaction, false for minor compaction - * @return future for status of compaction request - */ - public Future forceCompaction(boolean isMajor); - - /** - * Returns the timestamp of the last completed major compaction - * - * @return the timestamp or 0 if a major compaction has not taken place yet - */ - public long getLastMajorCompactionTimestamp(); - - public interface Compactor { - /** - * Requests a compaction operation be performed on this set of sorted oplogs. - * - * @param isMajor true for major compaction - * @param isForced true if the compaction should be carried out even if there - * is only one hoplog to compact - * - * @return true if compaction was performed, false otherwise - * @throws IOException - */ - boolean compact(boolean isMajor, boolean isForced) throws IOException; - - /** - * Stop the current compaction operation in the middle and suspend - * compaction operations. The current current compaction data - * will be thrown away, and no more compaction will be performend - * until resume is called. - */ - void suspend(); - - /** - * Resume compaction operations. - */ - void resume(); - - /** - * @return true if the compactor is not ready or busy - */ - boolean isBusy(boolean isMajor); - - /** - * @return the hdfsStore configuration used by this compactor - */ - public HDFSStore getHdfsStore(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HoplogSetIterator.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HoplogSetIterator.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HoplogSetIterator.java deleted file mode 100644 index 16939db..0000000 --- a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HoplogSetIterator.java +++ /dev/null @@ -1,166 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.gemstone.gemfire.cache.hdfs.internal.hoplog; - -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; -import java.util.NoSuchElementException; - -import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HFileSortedOplog.HFileReader.HFileSortedIterator; -import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HoplogSetReader.HoplogIterator; -import com.gemstone.gemfire.internal.cache.persistence.soplog.ByteComparator; -import com.gemstone.gemfire.internal.cache.persistence.soplog.TrackedReference; - -/** - * Provides a merged iterator on set of {@link HFileSortedOplog} - */ -public class HoplogSetIterator implements HoplogIterator { - private final List iters; - - // Number of entries remaining to be iterated by this scanner - private int entriesRemaining; - - // points at the current iterator holding the next entry - private ByteBuffer currentKey; - private ByteBuffer currentValue; - - public HoplogSetIterator(List> targets) throws IOException { - iters = new ArrayList(); - for (TrackedReference oplog : targets) { - HFileSortedIterator iter = (HFileSortedIterator) oplog.get().getReader().scan(); - if (!iter.hasNext()) { - // the oplog is empty, exclude from iterator - continue; - } - - // initialize the iterator - iter.nextBB(); - iters.add(iter); - entriesRemaining += oplog.get().getReader().getEntryCount(); - } - } - - public boolean hasNext() { - return entriesRemaining > 0; - } - - @Override - public ByteBuffer next() throws IOException { - return nextBB(); - } - public ByteBuffer nextBB() throws IOException { - if (!hasNext()) { - throw new NoSuchElementException(); - } - - seekToMinKeyIter(); - - return currentKey; - } - - private void seekToMinKeyIter() throws IOException { - HFileSortedIterator currentIter = null; - ByteBuffer minKey = null; - - // scan through all hoplog iterators to reach to the iterator with smallest - // key on the head and remove duplicate keys - for (Iterator iterator = iters.iterator(); iterator.hasNext();) { - HFileSortedIterator iter = iterator.next(); - - ByteBuffer tmpK = iter.getKeyBB(); - ByteBuffer tmpV = iter.getValueBB(); - if (minKey == null || ByteComparator.compareBytes(tmpK.array(), tmpK.arrayOffset(), tmpK.remaining(), minKey.array(), minKey.arrayOffset(), minKey.remaining()) < 0) { - minKey = tmpK; - currentKey = tmpK; - currentValue = tmpV; - currentIter = iter; - } else { - // remove possible duplicate key entries from iterator - if (seekHigherKeyInIter(minKey, iter) == null) { - // no more keys left in this iterator - iter.close(); - iterator.remove(); - } - } - } - - //seek next key in current iter - if (currentIter != null && seekHigherKeyInIter(minKey, currentIter) == null) { - // no more keys left in this iterator - currentIter.close(); - iters.remove(currentIter); - } - } - - private ByteBuffer seekHigherKeyInIter(ByteBuffer key, HFileSortedIterator iter) throws IOException { - ByteBuffer newK = iter.getKeyBB(); - - // remove all duplicates by incrementing iterator when a key is less than - // equal to current key - while (ByteComparator.compareBytes(newK.array(), newK.arrayOffset(), newK.remaining(), key.array(), key.arrayOffset(), key.remaining()) <= 0) { - entriesRemaining--; - if (iter.hasNext()) { - newK = iter.nextBB(); - } else { - newK = null; - break; - } - } - return newK; - } - - @Override - public ByteBuffer getKey() { - return getKeyBB(); - } - public ByteBuffer getKeyBB() { - if (currentKey == null) { - throw new IllegalStateException(); - } - return currentKey; - } - - @Override - public ByteBuffer getValue() { - return getValueBB(); - } - public ByteBuffer getValueBB() { - if (currentValue == null) { - throw new IllegalStateException(); - } - return currentValue; - } - - @Override - public void remove() { - throw new UnsupportedOperationException(); - } - - @Override - public void close() { - for (HoplogIterator iter : iters) { - iter.close(); - } - } - - public int getRemainingEntryCount() { - return entriesRemaining; - } -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HoplogSetReader.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HoplogSetReader.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HoplogSetReader.java deleted file mode 100644 index 789a616..0000000 --- a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HoplogSetReader.java +++ /dev/null @@ -1,114 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.gemstone.gemfire.cache.hdfs.internal.hoplog; - -import java.io.Closeable; -import java.io.IOException; -import java.util.Iterator; - -/** - * Reads a sorted oplog file or a merged set of sorted oplogs. - */ -public interface HoplogSetReader { - /** - * Returns the value associated with the given key. - */ - V read(K key) throws IOException; - - /** - * Iterators over the entire contents of the sorted file. - * - * @return the sorted iterator - * @throws IOException - */ - HoplogIterator scan() throws IOException; - - /** - * Scans the available keys and allows iteration over the interval [from, to) where the starting - * key is included and the ending key is excluded from the results. - * - * @param from - * the start key - * @param to - * the end key - * @return the sorted iterator - * @throws IOException - */ - HoplogIterator scan(K from, K to) throws IOException; - - /** - * Scans the keys and allows iteration between the given keys. - * - * @param from - * the start key - * @param fromInclusive - * true if the start key is included in the scan - * @param to - * the end key - * @param toInclusive - * true if the end key is included in the scan - * @return the sorted iterator - * @throws IOException - */ - HoplogIterator scan(K from, boolean fromInclusive, K to, boolean toInclusive) throws IOException; - - - /** - * Scans the available keys and allows iteration over the offset - * specified as parameters - * - * - * @param startOffset - * the start offset - * @param length - * bytes to read - * @return the sorted iterator - * @throws IOException - */ - HoplogIterator scan(long startOffset, long length) throws IOException; - - /** - * Using Cardinality estimator provides an approximate number of entries - * - * @return the number of entries - */ - long sizeEstimate(); - - /** - * Returns true if the reader has been closed. - * @return true if closed - */ - boolean isClosed(); - - /** - * Allows sorted iteration through a set of keys and values. - */ - public interface HoplogIterator { - K getKey(); - - V getValue(); - - /** moves to next element and returns the key object */ - K next() throws IOException; - - boolean hasNext(); - - void close(); - - void remove(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/SequenceFileHoplog.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/SequenceFileHoplog.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/SequenceFileHoplog.java deleted file mode 100644 index a2926ff..0000000 --- a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/SequenceFileHoplog.java +++ /dev/null @@ -1,395 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.gemstone.gemfire.cache.hdfs.internal.hoplog; - -import java.io.Closeable; -import java.io.EOFException; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.EnumMap; - -import com.gemstone.gemfire.internal.hll.ICardinality; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.BytesWritable; -import org.apache.hadoop.io.IOUtils; -import org.apache.hadoop.io.Text; - -import com.gemstone.gemfire.cache.hdfs.HDFSIOException; -import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HoplogSetReader.HoplogIterator; -import com.gemstone.gemfire.cache.hdfs.internal.org.apache.hadoop.io.SequenceFile; -import com.gemstone.gemfire.cache.hdfs.internal.org.apache.hadoop.io.SequenceFile.Reader; -import com.gemstone.gemfire.internal.cache.persistence.soplog.SortedOplogStatistics; -import com.gemstone.gemfire.internal.i18n.LocalizedStrings; -import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage; -import com.gemstone.gemfire.internal.Version; - -import org.apache.logging.log4j.Logger; - -/** - * Implements Sequence file based {@link Hoplog} - * - * - */ -public class SequenceFileHoplog extends AbstractHoplog{ - - public SequenceFileHoplog(FileSystem inputFS, Path filePath, - SortedOplogStatistics stats) - throws IOException - { - super(inputFS, filePath, stats); - } - @Override - public void close() throws IOException { - // Nothing to do - } - - @Override - public HoplogReader getReader() throws IOException { - return new SequenceFileReader(); - } - - @Override - /** - * gets the writer for sequence file. - * - * @param keys is not used for SequenceFileHoplog class - */ - public HoplogWriter createWriter(int keys) throws IOException { - return new SequenceFileHoplogWriter(); - } - - @Override - public boolean isClosed() { - return false; - } - - @Override - public void close(boolean clearCache) throws IOException { - // Nothing to do - } - - /** - * Currently, hsync does not update the file size on namenode. So, if last time the - * process died after calling hsync but before calling file close, the file is - * left with an inconsistent file size. This is a workaround that - open the file stream in append - * mode and close it. This fixes the file size on the namenode. - * - * @throws IOException - * @return true if the file size was fixed - */ - public boolean fixFileSize() throws IOException { - // Try to fix the file size - // Loop so that the expected expceptions can be ignored 3 - // times - if (logger.isDebugEnabled()) - logger.debug("{}Fixing size of hoplog " + path, logPrefix); - Exception e = null; - boolean exceptionThrown = false; - for (int i =0; i < 3; i++) { - try { - FSDataOutputStream stream = fsProvider.getFS().append(path); - stream.close(); - stream = null; - } catch (IOException ie) { - exceptionThrown = true; - e = ie; - if (logger.isDebugEnabled()) - logger.debug("{}Retry run " + (i + 1) + ": Hoplog " + path + " is still a temporary " + - "hoplog because the node managing it wasn't shutdown properly last time. Failed to " + - "fix the hoplog because an exception was thrown " + e, logPrefix ); - } - // As either RecoveryInProgressException was thrown or - // Already being created exception was thrown, wait for - // sometime before next retry. - if (exceptionThrown) { - try { - Thread.sleep(5000); - } catch (InterruptedException e1) { - } - exceptionThrown = false; - } else { - // no exception was thrown, break; - return true; - } - } - logger.info (logPrefix, LocalizedMessage.create(LocalizedStrings.DEBUG, "Hoplog " + path + " is still a temporary " + - "hoplog because the node managing it wasn't shutdown properly last time. Failed to " + - "fix the hoplog because an exception was thrown " + e)); - - return false; - } - - @Override - public String toString() { - return "SequenceFileHplog[" + getFileName() + "]"; - } - - private class SequenceFileHoplogWriter implements HoplogWriter { - - private SequenceFile.Writer writer = null; - - public SequenceFileHoplogWriter() throws IOException{ - writer = AbstractHoplog.getSequenceFileWriter(path, conf, logger); - } - - @Override - public void close() throws IOException { - writer.close(); - if (logger.isDebugEnabled()) - logger.debug("{}Completed creating hoplog " + path, logPrefix); - } - - @Override - public void hsync() throws IOException { - writer.hsyncWithSizeUpdate(); - if (logger.isDebugEnabled()) - logger.debug("{}hsync'ed a batch of data to hoplog " + path, logPrefix); - } - - @Override - public void append(byte[] key, byte[] value) throws IOException { - writer.append(new BytesWritable(key), new BytesWritable(value)); - } - - @Override - public void append(ByteBuffer key, ByteBuffer value) throws IOException { - throw new UnsupportedOperationException("Not supported for Sequence files"); - } - - @Override - public void close(EnumMap metadata) throws IOException { - throw new UnsupportedOperationException("Not supported for Sequence files"); - } - @Override - public long getCurrentSize() throws IOException { - return writer.getLength(); - } - - } - /** - * Sequence file reader. This is currently to be used only by MapReduce jobs and - * test functions - * - */ - public class SequenceFileReader implements HoplogReader, Closeable { - @Override - public byte[] read(byte[] key) throws IOException { - throw new UnsupportedOperationException("Not supported for Sequence files"); - } - - @Override - public HoplogIterator scan() - throws IOException { - return new SequenceFileIterator(fsProvider.getFS(), path, 0, Long.MAX_VALUE, conf, logger); - } - - @Override - public HoplogIterator scan( - byte[] from, byte[] to) throws IOException { - throw new UnsupportedOperationException("Not supported for Sequence files"); - } - - @Override - public HoplogIterator scan( - long startOffset, long length) throws IOException { - return new SequenceFileIterator(fsProvider.getFS(), path, startOffset, length, conf, logger); - } - - @Override - public HoplogIterator scan( - byte[] from, boolean fromInclusive, byte[] to, boolean toInclusive) - throws IOException { - throw new UnsupportedOperationException("Not supported for Sequence files"); - } - - @Override - public boolean isClosed() { - throw new UnsupportedOperationException("Not supported for Sequence files."); - } - - @Override - public void close() throws IOException { - throw new UnsupportedOperationException("Not supported for Sequence files. Close the iterator instead."); - } - - @Override - public ByteBuffer get(byte[] key) throws IOException { - throw new UnsupportedOperationException("Not supported for Sequence files"); - } - - @Override - public BloomFilter getBloomFilter() throws IOException { - throw new UnsupportedOperationException("Not supported for Sequence files"); - } - - @Override - public long getEntryCount() { - throw new UnsupportedOperationException("Not supported for Sequence files"); - } - - @Override - public ICardinality getCardinalityEstimator() { - throw new UnsupportedOperationException("Not supported for Sequence files"); - } - - @Override - public long sizeEstimate() { - throw new UnsupportedOperationException("Not supported for Sequence files"); - } - - - } - - /** - * Sequence file iterator. This is currently to be used only by MapReduce jobs and - * test functions - * - */ - public static class SequenceFileIterator implements HoplogIterator { - - SequenceFile.Reader reader = null; - private BytesWritable prefetchedKey = null; - private BytesWritable prefetchedValue = null; - private byte[] currentKey; - private byte[] currentValue; - boolean hasNext = false; - Logger logger; - Path path; - private long start; - private long end; - - public SequenceFileIterator(FileSystem fs, Path path, long startOffset, - long length, Configuration conf, Logger logger) - throws IOException { - Reader.Option optPath = SequenceFile.Reader.file(path); - - // Hadoop has a configuration parameter io.serializations that is a list of serialization - // classes which can be used for obtaining serializers and deserializers. This parameter - // by default contains avro classes. When a sequence file is created, it calls - // SerializationFactory.getSerializer(keyclass). This internally creates objects using - // reflection of all the classes that were part of io.serializations. But since, there is - // no avro class available it throws an exception. - // Before creating a sequenceFile, override the io.serializations parameter and pass only the classes - // that are important to us. - String serializations[] = conf.getStrings("io.serializations", - new String[]{"org.apache.hadoop.io.serializer.WritableSerialization"}); - conf.setStrings("io.serializations", - new String[]{"org.apache.hadoop.io.serializer.WritableSerialization"}); - // create reader - boolean emptyFile = false; - try { - reader = new SequenceFile.Reader(conf, optPath); - }catch (EOFException e) { - // this is ok as the file has ended. just return false that no more records available - emptyFile = true; - } - // reset the configuration to its original value - conf.setStrings("io.serializations", serializations); - this.logger = logger; - this.path = path; - - if (emptyFile) { - hasNext = false; - } else { - // The file should be read from the first sync marker after the start position and - // until the first sync marker after the end position is seen. - this.end = startOffset + length; - if (startOffset > reader.getPosition()) { - reader.sync(startOffset); // sync to start - } - this.start = reader.getPosition(); - this.hasNext = this.start < this.end; - if (hasNext) - readNext(); - } - } - - - public Version getVersion(){ - String version = reader.getMetadata().get(new Text(Meta.GEMFIRE_VERSION.name())).toString(); - return Version.fromOrdinalOrCurrent(Short.parseShort(version)); - } - @Override - public boolean hasNext() { - return hasNext; - } - - @Override - public byte[] next() { - currentKey = prefetchedKey.getBytes(); - currentValue = prefetchedValue.getBytes(); - - readNext(); - - return currentKey; - } - - private void readNext() { - try { - long pos = reader.getPosition(); - prefetchedKey = new BytesWritable(); - prefetchedValue = new BytesWritable(); - hasNext = reader.next(prefetchedKey, prefetchedValue); - // The file should be read from the first sync marker after the start position and - // until the first sync marker after the end position is seen. - if (pos >= end && reader.syncSeen()) { - hasNext = false; - } - } catch (EOFException e) { - // this is ok as the file has ended. just return false that no more records available - hasNext = false; - } - catch (IOException e) { - hasNext = false; - logger.error(LocalizedMessage.create(LocalizedStrings.HOPLOG_FAILED_TO_READ_HDFS_FILE, path), e); - throw new HDFSIOException( - LocalizedStrings.HOPLOG_FAILED_TO_READ_HDFS_FILE.toLocalizedString(path), e); - } - } - @Override - public void remove() { - throw new UnsupportedOperationException("Not supported for Sequence files"); - } - - @Override - public void close() { - IOUtils.closeStream(reader); - } - - @Override - public byte[] getKey() { - return currentKey; - } - - @Override - public byte[] getValue() { - return currentValue; - } - - /** Returns true iff the previous call to next passed a sync mark.*/ - public boolean syncSeen() { return reader.syncSeen(); } - - /** Return the current byte position in the input file. */ - public synchronized long getPosition() throws IOException { - return reader.getPosition(); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/mapred/AbstractGFRecordReader.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/mapred/AbstractGFRecordReader.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/mapred/AbstractGFRecordReader.java deleted file mode 100644 index f5b63cc..0000000 --- a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/mapred/AbstractGFRecordReader.java +++ /dev/null @@ -1,106 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.gemstone.gemfire.cache.hdfs.internal.hoplog.mapred; - -import java.io.IOException; - -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.RecordReader; -import org.apache.hadoop.mapred.lib.CombineFileSplit; - -import com.gemstone.gemfire.cache.hdfs.internal.PersistedEventImpl; -import com.gemstone.gemfire.cache.hdfs.internal.SortedHoplogPersistedEvent; -import com.gemstone.gemfire.cache.hdfs.internal.UnsortedHoplogPersistedEvent; -import com.gemstone.gemfire.cache.hdfs.internal.hoplog.mapreduce.GFKey; -import com.gemstone.gemfire.cache.hdfs.internal.hoplog.mapreduce.HDFSSplitIterator; - -public class AbstractGFRecordReader - extends - com.gemstone.gemfire.cache.hdfs.internal.hoplog.mapreduce.AbstractGFRecordReader - implements RecordReader { - - /** - * Initializes instance of record reader using file split and job - * configuration - * - * @param split - * @param conf - * @throws IOException - */ - public void initialize(CombineFileSplit split, JobConf conf) throws IOException { - CombineFileSplit cSplit = (CombineFileSplit) split; - Path[] path = cSplit.getPaths(); - long[] start = cSplit.getStartOffsets(); - long[] len = cSplit.getLengths(); - - FileSystem fs = cSplit.getPath(0).getFileSystem(conf); - this.splitIterator = HDFSSplitIterator.newInstance(fs, path, start, len, 0l, 0l); - } - - @Override - public boolean next(GFKey key, PersistedEventImpl value) throws IOException { - /* - * if there are more records in the hoplog, iterate to the next record. Set - * key object as is. - */ - - if (!super.hasNext()) { - key.setKey(null); - // TODO make value null; - return false; - } - - super.next(); - - key.setKey(super.getKey().getKey()); - PersistedEventImpl usersValue = super.getValue(); - value.copy(usersValue); - return true; - } - - @Override - public GFKey createKey() { - return new GFKey(); - } - - @Override - public PersistedEventImpl createValue() { - if(this.isSequential) { - return new UnsortedHoplogPersistedEvent(); - } else { - return new SortedHoplogPersistedEvent(); - } - } - - @Override - public long getPos() throws IOException { - // there is no efficient way to find the position of key in hoplog file. - return 0; - } - - @Override - public void close() throws IOException { - super.close(); - } - - @Override - public float getProgress() throws IOException { - return super.getProgressRatio(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/mapred/GFInputFormat.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/mapred/GFInputFormat.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/mapred/GFInputFormat.java deleted file mode 100644 index 0e0e455..0000000 --- a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/mapred/GFInputFormat.java +++ /dev/null @@ -1,95 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.gemstone.gemfire.cache.hdfs.internal.hoplog.mapred; - -import java.io.IOException; -import java.util.Collection; -import java.util.List; - -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.mapred.InputFormat; -import org.apache.hadoop.mapred.InputSplit; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.JobConfigurable; -import org.apache.hadoop.mapred.RecordReader; -import org.apache.hadoop.mapred.Reporter; -import org.apache.hadoop.mapred.lib.CombineFileSplit; - -import com.gemstone.gemfire.cache.hdfs.internal.PersistedEventImpl; -import com.gemstone.gemfire.cache.hdfs.internal.hoplog.mapreduce.GFKey; -import com.gemstone.gemfire.cache.hdfs.internal.hoplog.mapreduce.HoplogUtil.HoplogOptimizedSplitter; - -public class GFInputFormat extends - com.gemstone.gemfire.cache.hdfs.internal.hoplog.mapreduce.GFInputFormat - implements InputFormat, JobConfigurable { - - @Override - public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { - this.conf = job; - - Collection hoplogs = getHoplogs(); - return createSplits(job, hoplogs); - } - - /** - * Creates an input split for every block occupied by hoplogs of the input - * regions - * - * @param job - * @param hoplogs - * @return array of input splits of type file input split - * @throws IOException - */ - private InputSplit[] createSplits(JobConf job, Collection hoplogs) - throws IOException { - if (hoplogs == null || hoplogs.isEmpty()) { - return new InputSplit[0]; - } - - HoplogOptimizedSplitter splitter = new HoplogOptimizedSplitter(hoplogs); - List mr2Splits = splitter.getOptimizedSplits(conf); - InputSplit[] splits = new InputSplit[mr2Splits.size()]; - int i = 0; - for (org.apache.hadoop.mapreduce.InputSplit inputSplit : mr2Splits) { - org.apache.hadoop.mapreduce.lib.input.CombineFileSplit mr2Spit; - mr2Spit = (org.apache.hadoop.mapreduce.lib.input.CombineFileSplit) inputSplit; - - CombineFileSplit split = new CombineFileSplit(job, mr2Spit.getPaths(), - mr2Spit.getStartOffsets(), mr2Spit.getLengths(), - mr2Spit.getLocations()); - splits[i] = split; - i++; - } - - return splits; - } - - @Override - public RecordReader getRecordReader( - InputSplit split, JobConf job, Reporter reporter) throws IOException { - - CombineFileSplit cSplit = (CombineFileSplit) split; - AbstractGFRecordReader reader = new AbstractGFRecordReader(); - reader.initialize(cSplit, job); - return reader; - } - - @Override - public void configure(JobConf job) { - this.conf = job; - } -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/mapred/GFOutputFormat.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/mapred/GFOutputFormat.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/mapred/GFOutputFormat.java deleted file mode 100644 index 1494e9f..0000000 --- a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/mapred/GFOutputFormat.java +++ /dev/null @@ -1,75 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.gemstone.gemfire.cache.hdfs.internal.hoplog.mapred; - -import java.io.IOException; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.OutputFormat; -import org.apache.hadoop.mapred.RecordWriter; -import org.apache.hadoop.mapred.Reporter; -import org.apache.hadoop.util.Progressable; - -import com.gemstone.gemfire.cache.Region; -import com.gemstone.gemfire.cache.client.ClientCache; - -/** - * Output format for gemfire. The records provided to writers created by this - * output format are PUT in a live gemfire cluster. - * - */ -public class GFOutputFormat extends - com.gemstone.gemfire.cache.hdfs.internal.hoplog.mapreduce.GFOutputFormat - implements OutputFormat { - - @Override - public RecordWriter getRecordWriter( - FileSystem ignored, JobConf job, String name, Progressable progress) - throws IOException { - ClientCache cache = getClientCacheInstance(job); - return new GFRecordWriter(cache, job); - } - - @Override - public void checkOutputSpecs(FileSystem ignored, JobConf job) - throws IOException { - validateConfiguration(job); - } - - public class GFRecordWriter implements RecordWriter { - private ClientCache clientCache; - private Region region; - - public GFRecordWriter(ClientCache cache, Configuration conf) { - this.clientCache = cache; - region = getRegionInstance(conf, clientCache); - } - - @Override - public void write(Object key, Object value) throws IOException { - executePut(region, key, value); - } - - @Override - public void close(Reporter reporter) throws IOException { - closeClientCache(clientCache); - // TODO update reporter - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/mapreduce/AbstractGFRecordReader.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/mapreduce/AbstractGFRecordReader.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/mapreduce/AbstractGFRecordReader.java deleted file mode 100644 index 2c71b18..0000000 --- a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/mapreduce/AbstractGFRecordReader.java +++ /dev/null @@ -1,140 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.gemstone.gemfire.cache.hdfs.internal.hoplog.mapreduce; - -import java.io.IOException; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.mapreduce.InputSplit; -import org.apache.hadoop.mapreduce.RecordReader; -import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit; - -import com.gemstone.gemfire.cache.hdfs.internal.PersistedEventImpl; -import com.gemstone.gemfire.cache.hdfs.internal.SortedHoplogPersistedEvent; -import com.gemstone.gemfire.cache.hdfs.internal.UnsortedHoplogPersistedEvent; -import com.gemstone.gemfire.internal.util.BlobHelper; - -public class AbstractGFRecordReader extends - RecordReader { - - // constant overhead of each KV in hfile. This is used in computing the - // progress of record reader - protected long RECORD_OVERHEAD = 8; - - // accounting for number of bytes already read from the hfile - private long bytesRead; - - protected boolean isSequential; - - protected HDFSSplitIterator splitIterator; - - @Override - public void initialize(InputSplit split, TaskAttemptContext context) - throws IOException, InterruptedException { - CombineFileSplit cSplit = (CombineFileSplit) split; - Path[] path = cSplit.getPaths(); - long[] start = cSplit.getStartOffsets(); - long[] len = cSplit.getLengths(); - - Configuration conf = context.getConfiguration(); - FileSystem fs = cSplit.getPath(0).getFileSystem(conf); - - this.splitIterator = HDFSSplitIterator.newInstance(fs, path, start, len, 0l, 0l); - } - - @Override - public boolean nextKeyValue() throws IOException, InterruptedException { - return next(); - } - - protected boolean next() throws IOException { - if (!hasNext()) { - return false; - } - - splitIterator.next(); - bytesRead += (splitIterator.getKey().length + splitIterator.getValue().length); - bytesRead += RECORD_OVERHEAD; - return true; - } - - protected boolean hasNext() throws IOException { - return splitIterator.hasNext(); - } - - @Override - public GFKey getCurrentKey() throws IOException, InterruptedException { - return getKey(); - } - - protected GFKey getKey() throws IOException { - try { - GFKey key = new GFKey(); - key.setKey(BlobHelper.deserializeBlob(splitIterator.getKey())); - return key; - } catch (ClassNotFoundException e) { - // TODO resolve logging - return null; - } - } - - @Override - public PersistedEventImpl getCurrentValue() throws IOException, - InterruptedException { - return getValue(); - } - - protected PersistedEventImpl getValue() throws IOException { - try { - byte[] valueBytes = splitIterator.getValue(); - if(isSequential) { - return UnsortedHoplogPersistedEvent.fromBytes(valueBytes); - } else { - return SortedHoplogPersistedEvent.fromBytes(valueBytes); - } - } catch (ClassNotFoundException e) { - // TODO resolve logging - return null; - } - } - - @Override - public float getProgress() throws IOException, InterruptedException { - return getProgressRatio(); - } - - protected float getProgressRatio() throws IOException { - if (!splitIterator.hasNext()) { - return 1.0f; - } else if (bytesRead > splitIterator.getLength()) { - // the record reader is expected to read more number of bytes as it - // continues till beginning of next block. hence if extra reading has - // started return fixed value - return 0.95f; - } else { - return Math.min(1.0f, bytesRead / (float) (splitIterator.getLength())); - } - } - - @Override - public void close() throws IOException { - splitIterator.close(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/mapreduce/GFInputFormat.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/mapreduce/GFInputFormat.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/mapreduce/GFInputFormat.java deleted file mode 100644 index ff64ceb..0000000 --- a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/mapreduce/GFInputFormat.java +++ /dev/null @@ -1,124 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.gemstone.gemfire.cache.hdfs.internal.hoplog.mapreduce; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; - -import org.apache.hadoop.conf.Configurable; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.mapreduce.InputFormat; -import org.apache.hadoop.mapreduce.InputSplit; -import org.apache.hadoop.mapreduce.JobContext; -import org.apache.hadoop.mapreduce.RecordReader; -import org.apache.hadoop.mapreduce.TaskAttemptContext; - -import com.gemstone.gemfire.cache.hdfs.HDFSStore; -import com.gemstone.gemfire.cache.hdfs.internal.PersistedEventImpl; -import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HDFSRegionDirector.HdfsRegionManager; -import com.gemstone.gemfire.cache.hdfs.internal.hoplog.mapreduce.HoplogUtil.HoplogOptimizedSplitter; - -public class GFInputFormat extends InputFormat - implements Configurable { - public static final String HOME_DIR = "mapreduce.input.gfinputformat.homedir"; - public static final String INPUT_REGION = "mapreduce.input.gfinputformat.inputregion"; - public static final String START_TIME = "mapreduce.input.gfinputformat.starttime"; - public static final String END_TIME = "mapreduce.input.gfinputformat.endtime"; - public static final String CHECKPOINT = "mapreduce.input.gfinputformat.checkpoint"; - - protected Configuration conf; - - @Override - public List getSplits(JobContext job) throws IOException { - this.conf = job.getConfiguration(); - - Collection hoplogs = getHoplogs(); - return createSplits(hoplogs); - } - - /** - * Identifies filters provided in the job configuration and creates a list of - * sorted hoplogs. If there are no sorted hoplogs, checks if the region has - * sequential hoplogs - * - * @return list of hoplogs - * @throws IOException - */ - protected Collection getHoplogs() throws IOException { - String regionName = conf.get(INPUT_REGION); - System.out.println("GFInputFormat: Region Name is " + regionName); - if (regionName == null || regionName.trim().isEmpty()) { - // incomplete job configuration, region name must be provided - return new ArrayList(); - } - - String home = conf.get(HOME_DIR, HDFSStore.DEFAULT_HOME_DIR); - regionName = HdfsRegionManager.getRegionFolder(regionName); - Path regionPath = new Path(home + "/" + regionName); - FileSystem fs = regionPath.getFileSystem(conf); - - long start = conf.getLong(START_TIME, 0l); - long end = conf.getLong(END_TIME, 0l); - boolean checkpoint = conf.getBoolean(CHECKPOINT, true); - - // if the region contains flush hoplogs then the region is of type RW. - Collection hoplogs; - hoplogs = HoplogUtil.filterHoplogs(fs, regionPath, start, end, checkpoint); - return hoplogs == null ? new ArrayList() : hoplogs; - } - - /** - * Creates an input split for every block occupied by hoplogs of the input - * regions - * - * @param hoplogs - * @return list of input splits of type file input split - * @throws IOException - */ - private List createSplits(Collection hoplogs) - throws IOException { - List splits = new ArrayList(); - if (hoplogs == null || hoplogs.isEmpty()) { - return splits; - } - - HoplogOptimizedSplitter splitter = new HoplogOptimizedSplitter(hoplogs); - return splitter.getOptimizedSplits(conf); - } - - @Override - public RecordReader createRecordReader( - InputSplit split, TaskAttemptContext context) throws IOException, - InterruptedException { - return new AbstractGFRecordReader(); - } - - @Override - public void setConf(Configuration conf) { - this.conf = conf; - } - - @Override - public Configuration getConf() { - return conf; - } -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/mapreduce/GFKey.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/mapreduce/GFKey.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/mapreduce/GFKey.java deleted file mode 100644 index 5bba2c7..0000000 --- a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/mapreduce/GFKey.java +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.gemstone.gemfire.cache.hdfs.internal.hoplog.mapreduce; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; - -import org.apache.hadoop.io.WritableComparable; -import org.apache.hadoop.io.WritableComparator; - -import com.gemstone.gemfire.internal.util.BlobHelper; - -public class GFKey implements WritableComparable { - private Object key; - - public Object getKey() { - return key; - } - - public void setKey(Object key) { - this.key = key; - } - - @Override - public void write(DataOutput out) throws IOException { - byte[] bytes = BlobHelper.serializeToBlob(key); - out.writeInt(bytes.length); - out.write(bytes, 0, bytes.length); - } - - @Override - public void readFields(DataInput in) throws IOException { - int len = in.readInt(); - byte[] bytes = new byte[len]; - in.readFully(bytes, 0, len); - try { - key = BlobHelper.deserializeBlob(bytes); - } catch (ClassNotFoundException e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } - } - - @Override - public int compareTo(GFKey o) { - try { - byte[] b1 = BlobHelper.serializeToBlob(key); - byte[] b2 = BlobHelper.serializeToBlob(o.key); - return WritableComparator.compareBytes(b1, 0, b1.length, b2, 0, b2.length); - } catch (IOException e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } - - return 0; - } -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/mapreduce/GFOutputFormat.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/mapreduce/GFOutputFormat.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/mapreduce/GFOutputFormat.java deleted file mode 100644 index 3be2ab0..0000000 --- a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/mapreduce/GFOutputFormat.java +++ /dev/null @@ -1,198 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.gemstone.gemfire.cache.hdfs.internal.hoplog.mapreduce; - -import java.io.IOException; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.mapred.InvalidJobConfException; -import org.apache.hadoop.mapreduce.JobContext; -import org.apache.hadoop.mapreduce.OutputCommitter; -import org.apache.hadoop.mapreduce.OutputFormat; -import org.apache.hadoop.mapreduce.RecordWriter; -import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; -import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; - -import com.gemstone.gemfire.cache.Region; -import com.gemstone.gemfire.cache.RegionExistsException; -import com.gemstone.gemfire.cache.client.ClientCache; -import com.gemstone.gemfire.cache.client.ClientCacheFactory; -import com.gemstone.gemfire.cache.client.ClientRegionFactory; -import com.gemstone.gemfire.cache.client.ClientRegionShortcut; -import com.gemstone.gemfire.cache.server.CacheServer; -import com.gemstone.gemfire.management.internal.cli.converters.ConnectionEndpointConverter; - -/** - * Output format for gemfire. The records provided to writers created by this - * output format are PUT in a live gemfire cluster. - * - */ -public class GFOutputFormat extends OutputFormat { - public static final String REGION = "mapreduce.output.gfoutputformat.outputregion"; - public static final String LOCATOR_HOST = "mapreduce.output.gfoutputformat.locatorhost"; - public static final String LOCATOR_PORT = "mapreduce.output.gfoutputformat.locatorport"; - public static final String SERVER_HOST = "mapreduce.output.gfoutputformat.serverhost"; - public static final String SERVER_PORT = "mapreduce.output.gfoutputformat.serverport"; - - @Override - public RecordWriter getRecordWriter(TaskAttemptContext context) - throws IOException, InterruptedException { - Configuration conf = context.getConfiguration(); - ClientCache cache = getClientCacheInstance(conf); - return new GFRecordWriter(cache, context.getConfiguration()); - } - - public ClientCache getClientCacheInstance(Configuration conf) { - // if locator host is provided create a client cache instance using - // connection to locator. If locator is not provided and server host is also - // not provided, connect using default locator - ClientCache cache; - String serverHost = conf.get(SERVER_HOST); - if (serverHost == null || serverHost.isEmpty()) { - cache = createGFWriterUsingLocator(conf); - } else { - cache = createGFWriterUsingServer(conf); - } - return cache; - } - - /** - * Creates instance of {@link ClientCache} by connecting to GF cluster through - * locator - */ - public ClientCache createGFWriterUsingLocator(Configuration conf) { - // if locator host is not provided assume localhost - String locator = conf.get(LOCATOR_HOST, - ConnectionEndpointConverter.DEFAULT_LOCATOR_HOST); - // if locator port is not provided assume default locator port 10334 - int port = conf.getInt(LOCATOR_PORT, - ConnectionEndpointConverter.DEFAULT_LOCATOR_PORT); - - // create gemfire client cache instance - ClientCacheFactory ccf = new ClientCacheFactory(); - ccf.addPoolLocator(locator, port); - ClientCache cache = ccf.create(); - return cache; - } - - /** - * Creates instance of {@link ClientCache} by connecting to GF cluster through - * GF server - */ - public ClientCache createGFWriterUsingServer(Configuration conf) { - String server = conf.get(SERVER_HOST); - // if server port is not provided assume default server port, 40404 - int port = conf.getInt(SERVER_PORT, CacheServer.DEFAULT_PORT); - - // create gemfire client cache instance - ClientCacheFactory ccf = new ClientCacheFactory(); - ccf.addPoolServer(server, port); - ClientCache cache = ccf.create(); - return cache; - } - - public Region getRegionInstance(Configuration conf, - ClientCache cache) { - Region region; - - // create gemfire region in proxy mode - String regionName = conf.get(REGION); - ClientRegionFactory regionFactory = cache - .createClientRegionFactory(ClientRegionShortcut.PROXY); - try { - region = regionFactory.create(regionName); - } catch (RegionExistsException e) { - region = cache.getRegion(regionName); - } - - return region; - } - - /** - * Puts a K-V pair in region - * @param region - * @param key - * @param value - */ - public void executePut(Region region, Object key, Object value) { - region.put(key, value); - } - - /** - * Closes client cache instance - * @param clientCache - */ - public void closeClientCache(ClientCache clientCache) { - if (clientCache != null && !clientCache.isClosed()) { - clientCache.close(); - } - } - - /** - * Validates correctness and completeness of job's output configuration - * - * @param conf - * @throws InvalidJobConfException - */ - protected void validateConfiguration(Configuration conf) - throws InvalidJobConfException { - // User must configure the output region name. - String region = conf.get(REGION); - if (region == null || region.trim().isEmpty()) { - throw new InvalidJobConfException("Output Region name not provided."); - } - - // TODO validate if a client connected to gemfire cluster can be created - } - - @Override - public void checkOutputSpecs(JobContext context) throws IOException, - InterruptedException { - Configuration conf = context.getConfiguration(); - validateConfiguration(conf); - } - - @Override - public OutputCommitter getOutputCommitter(TaskAttemptContext context) - throws IOException, InterruptedException { - return new FileOutputCommitter(FileOutputFormat.getOutputPath(context), - context); - } - - public class GFRecordWriter extends RecordWriter { - private ClientCache clientCache; - private Region region; - - public GFRecordWriter(ClientCache cache, Configuration conf) { - this.clientCache = cache; - region = getRegionInstance(conf, clientCache); - } - - @Override - public void write(Object key, Object value) throws IOException, - InterruptedException { - executePut(region, key, value); - } - - @Override - public void close(TaskAttemptContext context) throws IOException, - InterruptedException { - closeClientCache(clientCache); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/mapreduce/HDFSSplitIterator.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/mapreduce/HDFSSplitIterator.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/mapreduce/HDFSSplitIterator.java deleted file mode 100644 index 869ad0d..0000000 --- a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/mapreduce/HDFSSplitIterator.java +++ /dev/null @@ -1,197 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.gemstone.gemfire.cache.hdfs.internal.hoplog.mapreduce; - -import java.io.IOException; - -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit; - -import com.gemstone.gemfire.cache.hdfs.HDFSIOException; -import com.gemstone.gemfire.cache.hdfs.internal.PersistedEventImpl; -import com.gemstone.gemfire.cache.hdfs.internal.hoplog.AbstractHoplog; -import com.gemstone.gemfire.cache.hdfs.internal.hoplog.AbstractHoplogOrganizer; -import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HoplogSetReader.HoplogIterator; -import com.gemstone.gemfire.i18n.LogWriterI18n; - -import org.apache.logging.log4j.Logger; - -import com.gemstone.gemfire.internal.i18n.LocalizedStrings; -import com.gemstone.gemfire.internal.logging.LogService; -import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage; - -/** - * Iterates over the records in part of a hoplog. This iterator - * is passed from the map reduce job into the gemfirexd LanguageConnectionContext - * for gemfirexd to use as the iterator during the map phase. - * - */ -public abstract class HDFSSplitIterator { - // data object for holding path, offset and length, of all the blocks this - // iterator needs to iterate on - private CombineFileSplit split; - - // the following members are pointers to current hoplog which is being - // iterated upon - private int currentHopIndex = 0; - private AbstractHoplog hoplog; - protected HoplogIterator iterator; - byte[] key; - byte[] value; - - private long bytesRead; - protected long RECORD_OVERHEAD = 8; - - private long startTime = 0l; - private long endTime = 0l; - - protected FileSystem fs; - private static final Logger logger = LogService.getLogger(); - protected final String logPrefix = "<" + "HDFSSplitIterator" + "> "; - - public HDFSSplitIterator(FileSystem fs, Path[] paths, long[] offsets, long[] lengths, long startTime, long endTime) throws IOException { - this.fs = fs; - this.split = new CombineFileSplit(paths, offsets, lengths, null); - while(currentHopIndex < split.getNumPaths() && !fs.exists(split.getPath(currentHopIndex))){ - logger.warn(LocalizedMessage.create(LocalizedStrings.HOPLOG_CLEANED_UP_BY_JANITOR, split.getPath(currentHopIndex))); - currentHopIndex++; - } - if(currentHopIndex == split.getNumPaths()){ - this.hoplog = null; - iterator = null; - } else { - this.hoplog = getHoplog(fs,split.getPath(currentHopIndex)); - iterator = hoplog.getReader().scan(split.getOffset(currentHopIndex), split.getLength(currentHopIndex)); - } - this.startTime = startTime; - this.endTime = endTime; - } - - /** - * Get the appropriate iterator for the file type. - */ - public static HDFSSplitIterator newInstance(FileSystem fs, Path[] path, - long[] start, long[] len, long startTime, long endTime) - throws IOException { - String fileName = path[0].getName(); - if (fileName.endsWith(AbstractHoplogOrganizer.SEQ_HOPLOG_EXTENSION)) { - return new StreamSplitIterator(fs, path, start, len, startTime, endTime); - } else { - return new RWSplitIterator(fs, path, start, len, startTime, endTime); - } - } - - public final boolean hasNext() throws IOException { - while (currentHopIndex < split.getNumPaths()) { - if (iterator != null) { - if(iterator.hasNext()) { - return true; - } else { - iterator.close(); - iterator = null; - hoplog.close(); - hoplog = null; - } - } - - if (iterator == null) { - // Iterator is null if this is first read from this iterator or all the - // entries from the previous iterator have been read. create iterator on - // the next hoplog. - currentHopIndex++; - while (currentHopIndex < split.getNumPaths() && !fs.exists(split.getPath(currentHopIndex))){ - logger.warn(LocalizedMessage.create(LocalizedStrings.HOPLOG_CLEANED_UP_BY_JANITOR, split.getPath(currentHopIndex).toString())); - currentHopIndex++; - } - if (currentHopIndex >= split.getNumPaths()) { - return false; - } - hoplog = getHoplog(fs, split.getPath(currentHopIndex)); - iterator = hoplog.getReader().scan(split.getOffset(currentHopIndex), split.getLength(currentHopIndex)); - } - } - - return false; - } - - public final boolean next() throws IOException { - while (hasNext()) { - key = iterator.next(); - value = iterator.getValue(); - bytesRead += (key.length + value.length); - bytesRead += RECORD_OVERHEAD; - - // if any filter is set, check if the event's timestamp matches the - // filter. The events returned by the iterator may not be time ordered. So - // it is important to check filters everytime. - if (startTime > 0 || endTime > 0) { - try { - PersistedEventImpl event = getDeserializedValue(); - long timestamp = event.getTimstamp(); - if (startTime > 0l && timestamp < startTime) { - continue; - } - - if (endTime > 0l && timestamp > endTime) { - continue; - } - } catch (ClassNotFoundException e) { - throw new HDFSIOException("Error reading from HDFS", e); - } - } - - return true; - } - - return false; - } - - public final long getBytesRead() { - return this.bytesRead; - } - - public final byte[] getKey() { - return key; - } - - public abstract PersistedEventImpl getDeserializedValue() - throws ClassNotFoundException, IOException; - - protected abstract AbstractHoplog getHoplog(FileSystem fs, Path path) - throws IOException; - - public final byte[] getValue() { - return value; - } - - public final long getLength() { - return split.getLength(); - } - - public void close() throws IOException { - if (iterator != null) { - iterator.close(); - iterator = null; - } - - if (hoplog != null) { - hoplog.close(); - hoplog.close(); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/mapreduce/HoplogUtil.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/mapreduce/HoplogUtil.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/mapreduce/HoplogUtil.java deleted file mode 100644 index c4c0d1c..0000000 --- a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/mapreduce/HoplogUtil.java +++ /dev/null @@ -1,463 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.gemstone.gemfire.cache.hdfs.internal.hoplog.mapreduce; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.regex.Matcher; -import java.util.regex.Pattern; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.BlockLocation; -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.LocatedFileStatus; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.RemoteIterator; -import org.apache.hadoop.mapreduce.InputSplit; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.JobContext; -import org.apache.hadoop.mapreduce.RecordReader; -import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat; -import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit; - -import com.gemstone.gemfire.cache.hdfs.internal.hoplog.AbstractHoplogOrganizer; -import com.gemstone.gemfire.cache.hdfs.internal.hoplog.AbstractHoplogOrganizer.HoplogComparator; -import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HoplogConfig; - -public class HoplogUtil { - /** - * @param regionPath - * HDFS path of the region - * @param fs - * file system associated with the region - * @param type - * type of hoplog to be fetched; flush hoplog or sequence hoplog - * @return All hoplog file paths belonging to the region provided - * @throws IOException - */ - public static Collection getAllRegionHoplogs(Path regionPath, - FileSystem fs, String type) throws IOException { - return getRegionHoplogs(regionPath, fs, type, 0, 0); - } - - /** - * @param regionPath - * Region path - * @param fs - * file system associated with the region - * @param type - * type of hoplog to be fetched; flush hoplog or sequence hoplog - * @param start - * Exclude files that do not contain records mutated after start time - * @param end - * Exclude files that do not contain records mutated before end time - * @return All hoplog file paths belonging to the region provided - * @throws IOException - */ - public static Collection getRegionHoplogs(Path regionPath, - FileSystem fs, String type, long start, long end) throws IOException { - Collection> allBuckets = getBucketHoplogs( - regionPath, fs, type, start, end); - - ArrayList hoplogs = new ArrayList(); - for (Collection bucket : allBuckets) { - for (FileStatus file : bucket) { - hoplogs.add(file); - } - } - return hoplogs; - } - - public static Collection> getBucketHoplogs(Path regionPath, - FileSystem fs, String type, long start, long end) throws IOException { - Collection> allBuckets = new ArrayList>(); - - // hoplog files names follow this pattern - String HOPLOG_NAME_REGEX = AbstractHoplogOrganizer.HOPLOG_NAME_REGEX + type; - String EXPIRED_HOPLOG_NAME_REGEX = HOPLOG_NAME_REGEX + AbstractHoplogOrganizer.EXPIRED_HOPLOG_EXTENSION; - final Pattern pattern = Pattern.compile(HOPLOG_NAME_REGEX); - final Pattern expiredPattern = Pattern.compile(EXPIRED_HOPLOG_NAME_REGEX); - - Path cleanUpIntervalPath = new Path(regionPath.getParent(), HoplogConfig.CLEAN_UP_INTERVAL_FILE_NAME); - long intervalDurationMillis = readCleanUpIntervalMillis(fs, cleanUpIntervalPath); - - // a region directory contains directories for individual buckets. A bucket - // has a integer name. - FileStatus[] bucketDirs = fs.listStatus(regionPath); - - for (FileStatus bucket : bucketDirs) { - if (!bucket.isDirectory()) { - continue; - } - try { - Integer.valueOf(bucket.getPath().getName()); - } catch (NumberFormatException e) { - continue; - } - - ArrayList bucketHoplogs = new ArrayList(); - - // identify all the flush hoplogs and seq hoplogs by visiting all the - // bucket directories - FileStatus[] bucketFiles = fs.listStatus(bucket.getPath()); - - Map expiredHoplogs = getExpiredHoplogs(fs, bucketFiles, expiredPattern); - - FileStatus oldestHopAfterEndTS = null; - long oldestHopTS = Long.MAX_VALUE; - long currentTimeStamp = System.currentTimeMillis(); - for (FileStatus file : bucketFiles) { - if (!file.isFile()) { - continue; - } - - Matcher match = pattern.matcher(file.getPath().getName()); - if (!match.matches()) { - continue; - } - - long timeStamp = AbstractHoplogOrganizer.getHoplogTimestamp(match); - if (start > 0 && timeStamp < start) { - // this hoplog contains records less than the start time stamp - continue; - } - - if (end > 0 && timeStamp > end) { - // this hoplog contains records mutated after end time stamp. Ignore - // this hoplog if it is not the oldest. - if (oldestHopTS > timeStamp) { - oldestHopTS = timeStamp; - oldestHopAfterEndTS = file; - } - continue; - } - long expiredTimeStamp = expiredTime(file, expiredHoplogs); - if (expiredTimeStamp > 0 && intervalDurationMillis > 0) { - if ((currentTimeStamp - expiredTimeStamp) > 0.8 * intervalDurationMillis) { - continue; - } - } - bucketHoplogs.add(file); - } - - if (oldestHopAfterEndTS != null) { - long expiredTimeStamp = expiredTime(oldestHopAfterEndTS, expiredHoplogs); - if (expiredTimeStamp <= 0 || intervalDurationMillis <=0 || - (currentTimeStamp - expiredTimeStamp) <= 0.8 * intervalDurationMillis) { - bucketHoplogs.add(oldestHopAfterEndTS); - } - } - - if (bucketHoplogs.size() > 0) { - allBuckets.add(bucketHoplogs); - } - } - - return allBuckets; - } - - private static Map getExpiredHoplogs(FileSystem fs, FileStatus[] bucketFiles, - Pattern expiredPattern) throws IOException{ - Map expiredHoplogs = new HashMap(); - - for(FileStatus file : bucketFiles) { - if(!file.isFile()) { - continue; - } - String fileName = file.getPath().getName(); - Matcher match = expiredPattern.matcher(fileName); - if (!match.matches()){ - continue; - } - expiredHoplogs.put(fileName,file.getModificationTime()); - } - return expiredHoplogs; - } - - private static long expiredTime(FileStatus file, Map expiredHoplogs){ - String expiredMarkerName = file.getPath().getName() + - AbstractHoplogOrganizer.EXPIRED_HOPLOG_EXTENSION; - - long expiredTimeStamp = -1; - if (expiredHoplogs.containsKey(expiredMarkerName)) { - expiredTimeStamp = expiredHoplogs.get(expiredMarkerName); - } - return expiredTimeStamp; - } - - public static long readCleanUpIntervalMillis(FileSystem fs, Path cleanUpIntervalPath) throws IOException{ - if (fs.exists(cleanUpIntervalPath)) { - FSDataInputStream input = new FSDataInputStream(fs.open(cleanUpIntervalPath)); - long intervalDurationMillis = input.readLong(); - input.close(); - return intervalDurationMillis; - } else { - return -1l; - } - } - - public static void exposeCleanupIntervalMillis(FileSystem fs, Path path, long intervalDurationMillis){ - FSDataInputStream input = null; - FSDataOutputStream output = null; - try { - if(fs.exists(path)){ - input = new FSDataInputStream(fs.open(path)); - if (intervalDurationMillis == input.readLong()) { - input.close(); - return; - } - input.close(); - fs.delete(path, true); - } - output = fs.create(path); - output.writeLong(intervalDurationMillis); - output.close(); - } catch (IOException e) { - return; - } finally { - try { - if (input != null){ - input.close(); - } - if (output != null) { - output.close(); - } - } catch(IOException e2) { - - } - } - } - - /** - * @param regionPath - * @param fs - * @return list of latest checkpoint files of all buckets in the region - * @throws IOException - */ - public static Collection getCheckpointFiles(Path regionPath, - FileSystem fs) throws IOException { - ArrayList latestSnapshots = new ArrayList(); - - Collection> allBuckets = getBucketHoplogs( - regionPath, fs, AbstractHoplogOrganizer.MAJOR_HOPLOG_EXTENSION, 0, 0); - - // extract the latest major compacted hoplog from each bucket - for (Collection bucket : allBuckets) { - FileStatus latestSnapshot = null; - for (FileStatus file : bucket) { - if (latestSnapshot == null) { - latestSnapshot = file; - } else { - String name1 = latestSnapshot.getPath().getName(); - String name2 = file.getPath().getName(); - - if (HoplogComparator.compareByName(name1, name2) > 0) { - latestSnapshot = file; - } - } - } - - if (latestSnapshot != null) { - latestSnapshots.add(latestSnapshot); - } - } - - return latestSnapshots; - } - - /** - * Creates a mapping of hoplog to hdfs blocks on disk - * - * @param files - * list of hoplog file status objects - * @return array of hdfs block location objects associated with a hoplog - * @throws IOException - */ - public static Map getBlocks(Configuration config, - Collection files) throws IOException { - Map blocks = new HashMap(); - if (files == null || files.isEmpty()) { - return blocks; - } - - FileSystem fs = files.iterator().next().getPath().getFileSystem(config); - - for (FileStatus hoplog : files) { - long length = hoplog.getLen(); - BlockLocation[] fileBlocks = fs.getFileBlockLocations(hoplog, 0, length); - blocks.put(hoplog, fileBlocks); - } - - return blocks; - } - - /** - * Filters out hoplogs of a region that do not match time filters and creates - * a list of hoplogs that may be used by hadoop jobs. - * - * @param fs - * file system instance - * @param path - * region path - * @param start - * start time in milliseconds - * @param end - * end time in milliseconds - * @param snapshot - * if true latest snapshot hoplog will be included in the final - * return list - * @return filtered collection of hoplogs - * @throws IOException - */ - public static Collection filterHoplogs(FileSystem fs, Path path, - long start, long end, boolean snapshot) throws IOException { - ArrayList hoplogs = new ArrayList(); - - // if the region contains flush hoplogs or major compacted files then the - // region is of type RW. - // check if the intent is to operate on major compacted files only - if (snapshot) { - hoplogs.addAll(getCheckpointFiles(path, fs)); - } else { - hoplogs.addAll(getRegionHoplogs(path, fs, - AbstractHoplogOrganizer.FLUSH_HOPLOG_EXTENSION, start, end)); - } - - if (hoplogs == null || hoplogs.isEmpty()) { - // there are no sorted hoplogs. Check if sequence hoplogs are present - // there is no checkpoint mode for write only tables - hoplogs.addAll(getRegionHoplogs(path, fs, - AbstractHoplogOrganizer.SEQ_HOPLOG_EXTENSION, start, end)); - } - - return hoplogs == null ? new ArrayList() : hoplogs; - } - - private HoplogUtil() { - //static methods only. - } - - /** - * This class creates MR splits from hoplog files. This class leverages - * CombineFileInputFormat to create locality, node and rack, aware splits - * - */ - public static class HoplogOptimizedSplitter extends CombineFileInputFormat { - private Collection hoplogs; - - public HoplogOptimizedSplitter(Collection hoplogs) { - this.hoplogs = hoplogs; - } - - @Override - protected List listStatus(JobContext job) throws IOException { - /** - * listStatus in super collects fileStatus for each file again. It also - * tries to recursively list files in subdirectories. None of this is - * applicable in this case. Splitter has already collected fileStatus for - * all files. So bypassing super's method will improve performance as NN - * chatter will be reduced. Specially helpful if NN is not colocated. - */ - return new ArrayList(hoplogs); - } - - /** - * Creates an array of splits for the input list of hoplogs. Each split is - * roughly the size of an hdfs block. Hdfs blocks of a hoplog may be smaller - * than hdfs block size, for e.g. if the hoplog is very small. The method - * keeps adding hdfs blocks of a hoplog to a split till the split is less - * than hdfs block size and the block is local to the split. - */ - public List getOptimizedSplits(Configuration conf) throws IOException { - - if (hoplogs == null || hoplogs.isEmpty()) { - return null; - } - Path[] paths = new Path[hoplogs.size()]; - int i = 0; - for (FileStatus file : hoplogs) { - paths[i] = file.getPath(); - i++; - } - - FileStatus hoplog = hoplogs.iterator().next(); - long blockSize = hoplog.getBlockSize(); - setMaxSplitSize(blockSize); - - Job job = Job.getInstance(conf); - setInputPaths(job, paths); - List splits = super.getSplits(job); - - // in some cases a split may not get populated with host location - // information. If such a split is created, fill location information of - // the first file in the split - ArrayList newSplits = new ArrayList(); - for (Iterator iter = splits.iterator(); iter.hasNext();) { - CombineFileSplit split = (CombineFileSplit) iter.next(); - if (split.getLocations() != null && split.getLocations().length > 0) { - continue; - } - - paths = split.getPaths(); - if (paths.length == 0) { - continue; - } - long[] starts = split.getStartOffsets(); - long[] ends = split.getLengths(); - - FileSystem fs = paths[0].getFileSystem(conf); - FileStatus file = fs.getFileStatus(paths[0]); - BlockLocation[] blks = fs.getFileBlockLocations(file, starts[0], ends[0]); - if (blks != null && blks.length > 0) { - // hosts found. Need to create a new split and replace the one missing - // hosts. - iter.remove(); - String hosts[] = blks[0].getHosts(); - split = new CombineFileSplit(paths, starts, ends, hosts); - newSplits.add(split); - } - } - splits.addAll(newSplits); - - return splits; - } - - @Override - public List getSplits(JobContext job) throws IOException { - // a call to this method is invalid. This class is only meant to create - // optimized splits independent of the api type - throw new IllegalStateException(); - } - - @Override - public RecordReader createRecordReader(InputSplit split, - TaskAttemptContext arg1) throws IOException { - // Record reader creation is managed by GFInputFormat. This method should - // not be called - throw new IllegalStateException(); - } - } -}