phoenix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gr...@apache.org
Subject git commit: PHOENIX-1216 fix code Change spooling directory
Date Tue, 16 Sep 2014 10:27:14 GMT
Repository: phoenix
Updated Branches:
  refs/heads/master 20e7559b4 -> 173c7d72d


PHOENIX-1216 fix code Change spooling directory

Make the spooling directory configurable.

Signed-off-by: Gabriel Reid <gabrielr@ngdata.com>


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/173c7d72
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/173c7d72
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/173c7d72

Branch: refs/heads/master
Commit: 173c7d72defbb5a73a36fd5c8c12c8e621295e70
Parents: 20e7559
Author: sofangel <sofangel@naver.com>
Authored: Tue Sep 16 15:31:34 2014 +0900
Committer: Gabriel Reid <gabrielr@ngdata.com>
Committed: Tue Sep 16 12:25:47 2014 +0200

----------------------------------------------------------------------
 .../phoenix/iterate/SpoolingResultIterator.java | 57 ++++++++++----------
 .../org/apache/phoenix/query/QueryServices.java |  2 +-
 .../phoenix/query/QueryServicesOptions.java     | 11 +++-
 .../iterate/SpoolingResultIteratorTest.java     |  2 +-
 .../phoenix/query/QueryServicesTestImpl.java    |  2 +
 5 files changed, 42 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/173c7d72/phoenix-core/src/main/java/org/apache/phoenix/iterate/SpoolingResultIterator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/SpoolingResultIterator.java
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SpoolingResultIterator.java
index 4672657..42a61a7 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/SpoolingResultIterator.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SpoolingResultIterator.java
@@ -47,19 +47,19 @@ import org.apache.phoenix.util.TupleUtil;
 
 
 /**
- * 
+ *
  * Result iterator that spools the results of a scan to disk once an in-memory threshold
has been reached.
  * If the in-memory threshold is not reached, the results are held in memory with no disk
writing perfomed.
  *
- * 
+ *
  * @since 0.1
  */
 public class SpoolingResultIterator implements PeekingResultIterator {
     private final PeekingResultIterator spoolFrom;
-    
+
     public static class SpoolingResultIteratorFactory implements ParallelIteratorFactory
{
         private final QueryServices services;
-        
+
         public SpoolingResultIteratorFactory(QueryServices services) {
             this.services = services;
         }
@@ -67,15 +67,16 @@ public class SpoolingResultIterator implements PeekingResultIterator {
         public PeekingResultIterator newIterator(StatementContext context, ResultIterator
scanner, Scan scan) throws SQLException {
             return new SpoolingResultIterator(scanner, services);
         }
-        
+
     }
 
     public SpoolingResultIterator(ResultIterator scanner, QueryServices services) throws
SQLException {
-        this (scanner, services.getMemoryManager(), 
-        		services.getProps().getInt(QueryServices.SPOOL_THRESHOLD_BYTES_ATTRIB, QueryServicesOptions.DEFAULT_SPOOL_THRESHOLD_BYTES),
-        		services.getProps().getLong(QueryServices.MAX_SPOOL_TO_DISK_BYTES_ATTRIB, QueryServicesOptions.DEFAULT_MAX_SPOOL_TO_DISK_BYTES));
+        this (scanner, services.getMemoryManager(),
+                services.getProps().getInt(QueryServices.SPOOL_THRESHOLD_BYTES_ATTRIB, QueryServicesOptions.DEFAULT_SPOOL_THRESHOLD_BYTES),
+                services.getProps().getLong(QueryServices.MAX_SPOOL_TO_DISK_BYTES_ATTRIB,
QueryServicesOptions.DEFAULT_MAX_SPOOL_TO_DISK_BYTES),
+                services.getProps().get(QueryServices.SPOOL_DIRECTORY, QueryServicesOptions.DEFAULT_SPOOL_DIRECTORY));
     }
-    
+
     /**
     * Create a result iterator by iterating through the results of a scan, spooling them
to disk once
     * a threshold has been reached. The scanner passed in is closed prior to returning.
@@ -85,7 +86,7 @@ public class SpoolingResultIterator implements PeekingResultIterator {
     *  the memory manager) is exceeded.
     * @throws SQLException
     */
-    SpoolingResultIterator(ResultIterator scanner, MemoryManager mm, final int thresholdBytes,
final long maxSpoolToDisk) throws SQLException {
+    SpoolingResultIterator(ResultIterator scanner, MemoryManager mm, final int thresholdBytes,
final long maxSpoolToDisk, final String spoolDirectory) throws SQLException {
         boolean success = false;
         boolean usedOnDiskIterator = false;
         final MemoryChunk chunk = mm.allocate(0, thresholdBytes);
@@ -93,7 +94,7 @@ public class SpoolingResultIterator implements PeekingResultIterator {
         try {
             // Can't be bigger than int, since it's the max of the above allocation
             int size = (int)chunk.getSize();
-            tempFile = File.createTempFile("ResultSpooler",".bin");
+            tempFile = File.createTempFile("ResultSpooler",".bin", new File(spoolDirectory));
             DeferredFileOutputStream spoolTo = new DeferredFileOutputStream(size, tempFile)
{
                 @Override
                 protected void thresholdReached() throws IOException {
@@ -102,7 +103,7 @@ public class SpoolingResultIterator implements PeekingResultIterator {
                 }
             };
             DataOutputStream out = new DataOutputStream(spoolTo);
-            final long maxBytesAllowed = maxSpoolToDisk == -1 ? 
+            final long maxBytesAllowed = maxSpoolToDisk == -1 ?
             		Long.MAX_VALUE : thresholdBytes + maxSpoolToDisk;
             long bytesWritten = 0L;
             int maxSize = 0;
@@ -152,17 +153,17 @@ public class SpoolingResultIterator implements PeekingResultIterator
{
     public Tuple next() throws SQLException {
         return spoolFrom.next();
     }
-    
+
     @Override
     public void close() throws SQLException {
         spoolFrom.close();
     }
 
     /**
-     * 
+     *
      * Backing result iterator if it was not necessary to spool results to disk.
      *
-     * 
+     *
      * @since 0.1
      */
     private static class InMemoryResultIterator implements PeekingResultIterator {
@@ -170,7 +171,7 @@ public class SpoolingResultIterator implements PeekingResultIterator {
         private final byte[] bytes;
         private Tuple next;
         private int offset;
-        
+
         private InMemoryResultIterator(byte[] bytes, MemoryChunk memoryChunk) throws SQLException
{
             this.bytes = bytes;
             this.memoryChunk = memoryChunk;
@@ -188,7 +189,7 @@ public class SpoolingResultIterator implements PeekingResultIterator {
             Tuple result = new ResultTuple(ResultUtil.toResult(value));
             return next = result;
         }
-        
+
         @Override
         public Tuple peek() throws SQLException {
             return next;
@@ -200,7 +201,7 @@ public class SpoolingResultIterator implements PeekingResultIterator {
             advance();
             return current;
         }
-        
+
         @Override
         public void close() {
             memoryChunk.close();
@@ -210,12 +211,12 @@ public class SpoolingResultIterator implements PeekingResultIterator
{
         public void explain(List<String> planSteps) {
         }
     }
-    
+
     /**
-     * 
+     *
      * Backing result iterator if results were spooled to disk
      *
-     * 
+     *
      * @since 0.1
      */
     private static class OnDiskResultIterator implements PeekingResultIterator {
@@ -226,12 +227,12 @@ public class SpoolingResultIterator implements PeekingResultIterator
{
         private int bufferIndex;
         private byte[][] buffers = new byte[2][];
         private boolean isClosed;
-        
+
         private OnDiskResultIterator (int maxSize, File file) {
             this.file = file;
             this.maxSize = maxSize;
         }
-        
+
         private synchronized void init() throws IOException {
             if (spoolFrom == null) {
                 spoolFrom = new DataInputStream(new BufferedInputStream(new FileInputStream(file)));
@@ -241,7 +242,7 @@ public class SpoolingResultIterator implements PeekingResultIterator {
                 advance();
             }
         }
-    
+
         private synchronized void reachedEnd() throws IOException {
             next = null;
             isClosed = true;
@@ -253,7 +254,7 @@ public class SpoolingResultIterator implements PeekingResultIterator {
                 file.delete();
             }
         }
-        
+
         private synchronized Tuple advance() throws IOException {
             if (isClosed) {
                 return next;
@@ -282,7 +283,7 @@ public class SpoolingResultIterator implements PeekingResultIterator {
             next = new ResultTuple(ResultUtil.toResult(new ImmutableBytesWritable(buffer,0,length)));
             return next;
         }
-        
+
         @Override
         public synchronized Tuple peek() throws SQLException {
             try {
@@ -292,7 +293,7 @@ public class SpoolingResultIterator implements PeekingResultIterator {
                 throw ServerUtil.parseServerException(e);
             }
         }
-    
+
         @Override
         public synchronized Tuple next() throws SQLException {
             try {
@@ -304,7 +305,7 @@ public class SpoolingResultIterator implements PeekingResultIterator {
                 throw ServerUtil.parseServerException(e);
             }
         }
-        
+
         @Override
         public synchronized void close() throws SQLException {
             try {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/173c7d72/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
index 95ec53b..9594f33 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
@@ -46,7 +46,7 @@ public interface QueryServices extends SQLCloseable {
     public static final String SPOOL_THRESHOLD_BYTES_ATTRIB = "phoenix.query.spoolThresholdBytes";
     public static final String HBASE_CLIENT_KEYTAB = "hbase.myclient.keytab";
     public static final String HBASE_CLIENT_PRINCIPAL = "hbase.myclient.principal";
-
+    public static final String SPOOL_DIRECTORY = "phoenix.spool.directory";
     
     /**
 	 * max size to spool the the result into

http://git-wip-us.apache.org/repos/asf/phoenix/blob/173c7d72/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
index e6cd94e..605d44c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
@@ -48,6 +48,7 @@ import static org.apache.phoenix.query.QueryServices.RPC_TIMEOUT_ATTRIB;
 import static org.apache.phoenix.query.QueryServices.SCAN_CACHE_SIZE_ATTRIB;
 import static org.apache.phoenix.query.QueryServices.SCAN_RESULT_CHUNK_SIZE;
 import static org.apache.phoenix.query.QueryServices.SEQUENCE_CACHE_SIZE_ATTRIB;
+import static org.apache.phoenix.query.QueryServices.SPOOL_DIRECTORY;
 import static org.apache.phoenix.query.QueryServices.SPOOL_THRESHOLD_BYTES_ATTRIB;
 import static org.apache.phoenix.query.QueryServices.STATS_UPDATE_FREQ_MS_ATTRIB;
 import static org.apache.phoenix.query.QueryServices.TARGET_QUERY_CONCURRENCY_ATTRIB;
@@ -77,6 +78,7 @@ public class QueryServicesOptions {
 	public static final int DEFAULT_QUEUE_SIZE = 500;
 	public static final int DEFAULT_THREAD_TIMEOUT_MS = 600000; // 10min
 	public static final int DEFAULT_SPOOL_THRESHOLD_BYTES = 1024 * 1024 * 20; // 20m
+    public static final String DEFAULT_SPOOL_DIRECTORY = "/tmp";
 	public static final int DEFAULT_MAX_MEMORY_PERC = 15; // 15% of heap
 	public static final int DEFAULT_MAX_MEMORY_WAIT_MS = 10000;
 	public static final int DEFAULT_MAX_TENANT_MEMORY_PERC = 100;
@@ -178,6 +180,7 @@ public class QueryServicesOptions {
             .setIfUnset(QUEUE_SIZE_ATTRIB, DEFAULT_QUEUE_SIZE)
             .setIfUnset(THREAD_TIMEOUT_MS_ATTRIB, DEFAULT_THREAD_TIMEOUT_MS)
             .setIfUnset(SPOOL_THRESHOLD_BYTES_ATTRIB, DEFAULT_SPOOL_THRESHOLD_BYTES)
+            .setIfUnset(SPOOL_DIRECTORY, DEFAULT_SPOOL_DIRECTORY)
             .setIfUnset(MAX_MEMORY_PERC_ATTRIB, DEFAULT_MAX_MEMORY_PERC)
             .setIfUnset(MAX_MEMORY_WAIT_MS_ATTRIB, DEFAULT_MAX_MEMORY_WAIT_MS)
             .setIfUnset(MAX_TENANT_MEMORY_PERC_ATTRIB, DEFAULT_MAX_TENANT_MEMORY_PERC)
@@ -232,7 +235,7 @@ public class QueryServicesOptions {
         config.setIfUnset(name, Long.toString(value));
         return this;
     }
-    
+
     private QueryServicesOptions setIfUnset(String name, String value) {
         config.setIfUnset(name, value);
         return this;
@@ -258,7 +261,11 @@ public class QueryServicesOptions {
     public QueryServicesOptions setSpoolThresholdBytes(int spoolThresholdBytes) {
         return set(SPOOL_THRESHOLD_BYTES_ATTRIB, spoolThresholdBytes);
     }
-    
+
+    public QueryServicesOptions setSpoolDirectory(String spoolDirectory) {
+        return set(SPOOL_DIRECTORY, spoolDirectory);
+    }
+
     public QueryServicesOptions setMaxMemoryPerc(int maxMemoryPerc) {
         return set(MAX_MEMORY_PERC_ATTRIB, maxMemoryPerc);
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/173c7d72/phoenix-core/src/test/java/org/apache/phoenix/iterate/SpoolingResultIteratorTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/iterate/SpoolingResultIteratorTest.java
b/phoenix-core/src/test/java/org/apache/phoenix/iterate/SpoolingResultIteratorTest.java
index e1b6864..ab6a4a7 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/iterate/SpoolingResultIteratorTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/iterate/SpoolingResultIteratorTest.java
@@ -52,7 +52,7 @@ public class SpoolingResultIteratorTest {
             };
 
         MemoryManager memoryManager = new DelegatingMemoryManager(new GlobalMemoryManager(threshold,
0));
-        ResultIterator scanner = new SpoolingResultIterator(iterator, memoryManager, threshold,
maxSizeSpool);
+        ResultIterator scanner = new SpoolingResultIterator(iterator, memoryManager, threshold,
maxSizeSpool,"/tmp");
         AssertResults.assertResults(scanner, expectedResults);
     }
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/173c7d72/phoenix-core/src/test/java/org/apache/phoenix/query/QueryServicesTestImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/QueryServicesTestImpl.java
b/phoenix-core/src/test/java/org/apache/phoenix/query/QueryServicesTestImpl.java
index e125755..47f5b1b 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/query/QueryServicesTestImpl.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/query/QueryServicesTestImpl.java
@@ -17,6 +17,7 @@
  */
 package org.apache.phoenix.query;
 
+import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_SPOOL_DIRECTORY;
 import static org.apache.phoenix.query.QueryServicesOptions.withDefaults;
 
 import org.apache.hadoop.hbase.regionserver.wal.IndexedWALEditCodec;
@@ -64,6 +65,7 @@ public final class QueryServicesTestImpl extends BaseQueryServicesImpl {
                 .setMaxMemoryPerc(DEFAULT_MAX_MEMORY_PERC)
                 .setThreadTimeoutMs(DEFAULT_THREAD_TIMEOUT_MS)
                 .setSpoolThresholdBytes(DEFAULT_SPOOL_THRESHOLD_BYTES)
+                .setSpoolDirectory(DEFAULT_SPOOL_DIRECTORY)
                 .setMaxMemoryWaitMs(DEFAULT_MAX_MEMORY_WAIT_MS)
                 .setMaxTenantMemoryPerc(DEFAULT_MAX_TENANT_MEMORY_PERC)
                 .setMaxServerCacheSize(DEFAULT_MAX_HASH_CACHE_SIZE)


Mime
View raw message