tajo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hyun...@apache.org
Subject tajo git commit: TAJO-2022: Add AsyncTaskServer to TajoMaster.
Date Thu, 17 Dec 2015 03:18:01 GMT
Repository: tajo
Updated Branches:
  refs/heads/master cc1efb66d -> 1515e388a


TAJO-2022: Add AsyncTaskServer to TajoMaster.

Closes #913


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/1515e388
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/1515e388
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/1515e388

Branch: refs/heads/master
Commit: 1515e388a764a55b7b37ce2c23ddfab21fe8115d
Parents: cc1efb6
Author: Hyunsik Choi <hyunsik@apache.org>
Authored: Wed Dec 16 19:12:39 2015 -0800
Committer: Hyunsik Choi <hyunsik@apache.org>
Committed: Wed Dec 16 19:12:39 2015 -0800

----------------------------------------------------------------------
 CHANGES                                         |   2 +
 .../java/org/apache/tajo/conf/TajoConf.java     |   7 ++
 .../tajo/cli/tsql/TestTajoCliNegatives.java     |   2 +-
 .../org/apache/tajo/io/AsyncTaskService.java    | 109 +++++++++++++++++++
 .../java/org/apache/tajo/master/TajoMaster.java |   9 ++
 .../tajo/master/TajoMasterClientService.java    |  24 ++--
 .../exec/NonForwardQueryResultFileScanner.java  | 102 ++++++++---------
 .../apache/tajo/master/exec/QueryExecutor.java  |  10 +-
 .../ws/rs/resources/QueryResultResource.java    |  11 +-
 9 files changed, 206 insertions(+), 70 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/1515e388/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 65353b1..406e56a 100644
--- a/CHANGES
+++ b/CHANGES
@@ -8,6 +8,8 @@ Release 0.12.0 - unreleased
 
   IMPROVEMENT
 
+    TAJO-2022: Add AsyncTaskServer to TajoMaster. (hyunsik)
+
     TAJO-1990: Refine some parts in HBaseTablespace. (hyunsik)
 
     TAJO-2005: Add TableStatUpdateRewriter. (hyunsik)

http://git-wip-us.apache.org/repos/asf/tajo/blob/1515e388/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
index 9f788eb..b2e08bd 100644
--- a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
+++ b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
@@ -152,6 +152,13 @@ public class TajoConf extends Configuration {
     DEFAULT_SERVICE_TRACKER_CLASS("tajo.discovery.service-tracker.class", BaseServiceTracker.class.getCanonicalName()),
     HA_SERVICE_TRACKER_CLASS("tajo.discovery.ha-service-tracker.class", "org.apache.tajo.ha.HdfsServiceTracker"),
 
+    // Async IO Task Service
+
+    /** The number of threads for async tasks */
+    MASTER_ASYNC_TASK_THREAD_NUM("tajo.master.async-task.thread-num", 4),
+    /** How long it will wait for termination */
+    MASTER_ASYNC_TASK_TERMINATION_WAIT_TIME("tajo.master.async-task.wait-time-sec", 60),
// 1 min
+
     // Resource tracker service
     RESOURCE_TRACKER_RPC_ADDRESS("tajo.resource-tracker.rpc.address", "localhost:26003",
         Validators.networkAddr()),

http://git-wip-us.apache.org/repos/asf/tajo/blob/1515e388/tajo-core-tests/src/test/java/org/apache/tajo/cli/tsql/TestTajoCliNegatives.java
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/cli/tsql/TestTajoCliNegatives.java
b/tajo-core-tests/src/test/java/org/apache/tajo/cli/tsql/TestTajoCliNegatives.java
index bf4ffcf..6d939de 100644
--- a/tajo-core-tests/src/test/java/org/apache/tajo/cli/tsql/TestTajoCliNegatives.java
+++ b/tajo-core-tests/src/test/java/org/apache/tajo/cli/tsql/TestTajoCliNegatives.java
@@ -134,7 +134,7 @@ public class TestTajoCliNegatives extends QueryTestCaseBase {
     assertScriptFailure("select fail(3, l_orderkey, 'testQueryFailureOfSimpleQuery') from
default.lineitem" ,
         "?fail\n" +
             "-------------------------------\n" +
-            "ERROR: internal error: internal error: testQueryFailureOfSimpleQuery\n");
+            "ERROR: internal error: internal error: internal error: testQueryFailureOfSimpleQuery\n");
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/tajo/blob/1515e388/tajo-core/src/main/java/org/apache/tajo/io/AsyncTaskService.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/io/AsyncTaskService.java b/tajo-core/src/main/java/org/apache/tajo/io/AsyncTaskService.java
new file mode 100644
index 0000000..71faaee
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/io/AsyncTaskService.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tajo.io;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.conf.TajoConf.ConfVars;
+import org.apache.tajo.master.TajoMaster;
+import org.apache.tajo.util.TUtil;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+
+/**
+ * AsyncTaskService executes some blocking tasks in TajoMaster
+ *
+ * @See https://issues.apache.org/jira/browse/TAJO-2022
+ */
+public class AsyncTaskService extends AbstractService {
+  private final TajoMaster.MasterContext context;
+  private long TERMINATION_WAIT_TIME_SEC;
+  private ExecutorService executor;
+
+  /**
+   * Construct the service.
+   *
+   * @param context
+   */
+  public AsyncTaskService(TajoMaster.MasterContext context) {
+    super("MasterAsyncTaskExecutor");
+    this.context = context;
+  }
+
+  @Override
+  protected void serviceInit(Configuration conf) throws Exception {
+    TajoConf systemConf = TUtil.checkTypeAndGet(conf, TajoConf.class);
+    TERMINATION_WAIT_TIME_SEC = systemConf.getLongVar(ConfVars.MASTER_ASYNC_TASK_TERMINATION_WAIT_TIME);
+    executor = Executors.newFixedThreadPool(systemConf.getIntVar(ConfVars.MASTER_ASYNC_TASK_THREAD_NUM));
+
+    super.serviceInit(conf);
+  }
+
+  @Override
+  protected void serviceStart() throws Exception {
+    super.serviceStart();
+  }
+
+  @Override
+  protected void serviceStop() throws Exception {
+    executor.shutdown();
+    boolean terminated = false;
+    try {
+      terminated = executor.awaitTermination(TERMINATION_WAIT_TIME_SEC, TimeUnit.SECONDS);
+    } catch (InterruptedException e) {
+    }
+    if (!terminated) {
+      executor.shutdownNow();
+    }
+
+    super.serviceStop();
+  }
+
+  public TajoMaster.MasterContext getMasterContext() {
+    return this.context;
+  }
+
+  /**
+   * Returns a new CompletableFuture that is asynchronously completed
+   * by a task running in AsyncTaskService.
+   *
+   * @param task Task
+   * @param <T> Return Type
+   * @return CompletableFuture
+   */
+  public <T> CompletableFuture<T> supply(Supplier<T> task) {
+    return CompletableFuture.supplyAsync(task, executor);
+  }
+
+  /**
+   * Returns a new CompletableFuture that is asynchronously completed
+   * by a task running in the given executor after it runs the given
+   * action.
+   *
+   * @param task Task
+   * @return CompletableFuture
+   */
+  public CompletableFuture<Void> run(Runnable task) {
+    return CompletableFuture.runAsync(task, executor);
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/1515e388/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java
index a74573e..97e9613 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java
@@ -48,6 +48,7 @@ import org.apache.tajo.conf.TajoConf.ConfVars;
 import org.apache.tajo.engine.function.FunctionLoader;
 import org.apache.tajo.exception.*;
 import org.apache.tajo.function.FunctionSignature;
+import org.apache.tajo.io.AsyncTaskService;
 import org.apache.tajo.master.rm.TajoResourceManager;
 import org.apache.tajo.metrics.ClusterResourceMetricSet;
 import org.apache.tajo.metrics.Master;
@@ -119,6 +120,7 @@ public class TajoMaster extends CompositeService {
   private CatalogServer catalogServer;
   private CatalogService catalog;
   private GlobalEngine globalEngine;
+  private AsyncTaskService asyncTaskService;
   private AsyncDispatcher dispatcher;
   private TajoMasterClientService tajoMasterClientService;
   private QueryCoordinatorService tajoMasterService;
@@ -196,6 +198,9 @@ public class TajoMaster extends CompositeService {
     tajoMasterClientService = new TajoMasterClientService(context);
     addIfService(tajoMasterClientService);
 
+    asyncTaskService = new AsyncTaskService(context);
+    addIfService(asyncTaskService);
+
     tajoMasterService = new QueryCoordinatorService(context);
     addIfService(tajoMasterService);
 
@@ -501,6 +506,10 @@ public class TajoMaster extends CompositeService {
       return globalEngine;
     }
 
+    public AsyncTaskService asyncTaskExecutor() {
+      return asyncTaskService;
+    }
+
     public QueryCoordinatorService getTajoMasterService() {
       return tajoMasterService;
     }

http://git-wip-us.apache.org/repos/asf/tajo/blob/1515e388/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
index 017b17f..bb04229 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.service.AbstractService;
 import org.apache.tajo.QueryId;
 import org.apache.tajo.QueryIdFactory;
 import org.apache.tajo.TajoIdProtos;
+import org.apache.tajo.TajoProtos;
 import org.apache.tajo.TajoProtos.QueryState;
 import org.apache.tajo.catalog.*;
 import org.apache.tajo.catalog.partition.PartitionMethodDesc;
@@ -62,10 +63,7 @@ import org.apache.tajo.util.NetUtils;
 import org.apache.tajo.util.ProtoUtil;
 
 import java.net.InetSocketAddress;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
+import java.util.*;
 
 import static org.apache.tajo.TajoConstants.DEFAULT_DATABASE_NAME;
 import static org.apache.tajo.exception.ExceptionUtil.printStackTraceIfError;
@@ -558,13 +556,17 @@ public class TajoMasterClientService extends AbstractService {
             scanNode.init(resultTableDesc);
           }
 
-          if(request.hasCompressCodec()) {
-            queryResultScanner = new NonForwardQueryResultFileScanner(context.getConf(),
session.getSessionId(),
-                queryId, scanNode, Integer.MAX_VALUE, request.getCompressCodec());
-          } else {
-            queryResultScanner = new NonForwardQueryResultFileScanner(context.getConf(),
-                session.getSessionId(), queryId, scanNode, Integer.MAX_VALUE);
-          }
+          Optional<TajoProtos.CodecType> codecType =
+              request.hasCompressCodec() ? Optional.of(request.getCompressCodec()) : Optional.empty();
+
+          queryResultScanner = new NonForwardQueryResultFileScanner(
+              context.asyncTaskExecutor(),
+              context.getConf(),
+              session.getSessionId(),
+              queryId,
+              scanNode,
+              Integer.MAX_VALUE,
+              codecType);
 
           queryResultScanner.init();
           session.addNonForwardQueryResultScanner(queryResultScanner);

http://git-wip-us.apache.org/repos/asf/tajo/blob/1515e388/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultFileScanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultFileScanner.java
b/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultFileScanner.java
index 80275ce..8953315 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultFileScanner.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultFileScanner.java
@@ -40,6 +40,7 @@ import org.apache.tajo.engine.query.QueryContext;
 import org.apache.tajo.exception.TajoException;
 import org.apache.tajo.exception.TajoInternalError;
 import org.apache.tajo.ipc.ClientProtos.SerializedResultSet;
+import org.apache.tajo.io.AsyncTaskService;
 import org.apache.tajo.plan.logical.ScanNode;
 import org.apache.tajo.querymaster.Repartitioner;
 import org.apache.tajo.storage.*;
@@ -56,38 +57,34 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
+import java.util.Optional;
 import java.util.concurrent.Future;
 
 public class NonForwardQueryResultFileScanner implements NonForwardQueryResultScanner {
   private final static Log LOG = LogFactory.getLog(NonForwardQueryResultFileScanner.class);
 
-  private QueryId queryId;
-  private String sessionId;
+  final private AsyncTaskService asyncTaskService;
+  final private QueryId queryId;
+  final private String sessionId;
   private ScanExec scanExec;
-  private TableDesc tableDesc;
-  private RowStoreEncoder rowEncoder;
-  private int maxRow;
+  final private TableDesc tableDesc;
+  final private RowStoreEncoder rowEncoder;
+  final private int maxRow;
   private boolean eof;
   private volatile long totalRows;
   private volatile int currentNumRows;
   private volatile boolean isStopped;
   private TaskAttemptContext taskContext;
-  private TajoConf tajoConf;
-  private ScanNode scanNode;
-  private CodecType codecType;
-  private ExecutorService executor;
+  final private TajoConf tajoConf;
+  final private ScanNode scanNode;
+  final private Optional<CodecType> codecType;
   private MemoryRowBlock rowBlock;
   private Future<MemoryRowBlock> nextFetch;
 
-  public NonForwardQueryResultFileScanner(TajoConf tajoConf, String sessionId, QueryId queryId,
ScanNode scanNode,
-                                          int maxRow) throws IOException {
-    this(tajoConf, sessionId, queryId, scanNode, maxRow, null);
-  }
-
-  public NonForwardQueryResultFileScanner(TajoConf tajoConf, String sessionId, QueryId queryId,
ScanNode scanNode,
-      int maxRow, CodecType codecType) throws IOException {
+  public NonForwardQueryResultFileScanner(AsyncTaskService asyncTaskService,
+                                          TajoConf tajoConf, String sessionId, QueryId queryId,
ScanNode scanNode,
+                                          int maxRow, Optional<CodecType> codecType)
throws IOException {
+    this.asyncTaskService = asyncTaskService;
     this.tajoConf = tajoConf;
     this.sessionId = sessionId;
     this.queryId = queryId;
@@ -159,19 +156,22 @@ public class NonForwardQueryResultFileScanner implements NonForwardQueryResultSc
       rowBlock = null;
     }
 
-    if(executor != null) {
-      executor.shutdown();
-    }
-
     //remove temporal final output
     if (!tajoConf.getBoolVar(TajoConf.ConfVars.$DEBUG_ENABLED)) {
       Path temporalResultDir = TajoConf.getTemporalResultDir(tajoConf, queryId);
+
       if (tableDesc.getUri().equals(temporalResultDir.toUri())) {
-        temporalResultDir.getFileSystem(tajoConf).delete(temporalResultDir.getParent(), true);
+        asyncTaskService.run(() -> {
+              try {
+                temporalResultDir.getFileSystem(tajoConf).delete(temporalResultDir.getParent(),
true);
+              } catch (IOException e) {
+                LOG.error(e);
+              }
+            }
+        );
       }
     }
 
-
     LOG.info(String.format("\"Sent result to client for %s, queryId: %s %s rows: %d",
         sessionId, queryId,
         codecType != null ? ", compression: " + codecType : "",
@@ -229,13 +229,13 @@ public class NonForwardQueryResultFileScanner implements NonForwardQueryResultSc
         resultSetBuilder.setRows(rowBlock.rows());
         MemoryBlock memoryBlock = rowBlock.getMemory();
 
-        if (codecType != null) {
+        if (codecType.isPresent()) {
           byte[] uncompressedBytes = new byte[memoryBlock.readableBytes()];
           memoryBlock.getBuffer().getBytes(0, uncompressedBytes);
 
-          byte[] compressedBytes = CompressionUtil.compress(codecType, uncompressedBytes);
+          byte[] compressedBytes = CompressionUtil.compress(codecType.get(), uncompressedBytes);
           resultSetBuilder.setDecompressedLength(uncompressedBytes.length);
-          resultSetBuilder.setDecompressCodec(codecType);
+          resultSetBuilder.setDecompressCodec(codecType.get());
           resultSetBuilder.setSerializedTuples(ByteString.copyFrom(compressedBytes));
         } else {
           ByteBuffer uncompressed = memoryBlock.getBuffer().nioBuffer(0, memoryBlock.readableBytes());
@@ -272,42 +272,34 @@ public class NonForwardQueryResultFileScanner implements NonForwardQueryResultSc
       return future;
     }
 
-    if (executor == null) {
-      executor = Executors.newSingleThreadExecutor();
-    }
-
-    executor.submit(new Runnable() {
-      @Override
-      public void run() {
-        try {
-          rowBlock.clear();
-          int endRow = currentNumRows + fetchRowNum;
-          while (currentNumRows < endRow) {
-            Tuple tuple = scanExec.next();
-            if (tuple == null) {
+    return asyncTaskService.supply(() -> {
+      try {
+        rowBlock.clear();
+        int endRow = currentNumRows + fetchRowNum;
+        while (currentNumRows < endRow) {
+          Tuple tuple = scanExec.next();
+          if (tuple == null) {
+            eof = true;
+            break;
+          } else {
+            rowBlock.getWriter().addTuple(tuple);
+            currentNumRows++;
+            if (currentNumRows >= maxRow) {
               eof = true;
               break;
-            } else {
-              rowBlock.getWriter().addTuple(tuple);
-              currentNumRows++;
-              if (currentNumRows >= maxRow) {
-                eof = true;
-                break;
-              }
             }
           }
+        }
 
-          if (rowBlock.rows() > 0) {
-            totalRows += rowBlock.rows();
-          }
-
-          future.set(rowBlock);
-        } catch (Throwable t) {
-          future.setException(t);
+        if (rowBlock.rows() > 0) {
+          totalRows += rowBlock.rows();
         }
+
+        return rowBlock;
+      } catch (Throwable t) {
+        throw new TajoInternalError(t);
       }
     });
-    return future;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/tajo/blob/1515e388/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java b/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java
index 6afa67a..8dddf2c 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java
@@ -72,6 +72,7 @@ import java.net.URI;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Optional;
 
 import static org.apache.tajo.exception.ReturnStateUtil.OK;
 import static org.apache.tajo.exception.ReturnStateUtil.errUndefinedDatabase;
@@ -302,7 +303,14 @@ public class QueryExecutor {
         plan.getRootBlock().getRoot());
 
     final NonForwardQueryResultScanner queryResultScanner = new NonForwardQueryResultFileScanner(
-        context.getConf(), session.getSessionId(), queryInfo.getQueryId(), scanNode, maxRow);
+        context.asyncTaskExecutor(),
+        context.getConf(),
+        session.getSessionId(),
+        queryInfo.getQueryId(),
+        scanNode,
+        maxRow,
+        Optional.empty());
+
     queryResultScanner.init();
 
     session.addNonForwardQueryResultScanner(queryResultScanner);

http://git-wip-us.apache.org/repos/asf/tajo/blob/1515e388/tajo-core/src/main/java/org/apache/tajo/ws/rs/resources/QueryResultResource.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/ws/rs/resources/QueryResultResource.java
b/tajo-core/src/main/java/org/apache/tajo/ws/rs/resources/QueryResultResource.java
index 2438060..93d397a 100644
--- a/tajo-core/src/main/java/org/apache/tajo/ws/rs/resources/QueryResultResource.java
+++ b/tajo-core/src/main/java/org/apache/tajo/ws/rs/resources/QueryResultResource.java
@@ -50,6 +50,7 @@ import java.net.URI;
 import java.security.MessageDigest;
 import java.security.NoSuchAlgorithmException;
 import java.util.List;
+import java.util.Optional;
 
 public class QueryResultResource {
   
@@ -133,8 +134,14 @@ public class QueryResultResource {
         scanNode.init(resultTableDesc);
       }
 
-      resultScanner = new NonForwardQueryResultFileScanner(masterContext.getConf(), session.getSessionId(),
queryId,
-          scanNode, Integer.MAX_VALUE);
+      resultScanner = new NonForwardQueryResultFileScanner(
+          masterContext.asyncTaskExecutor(),
+          masterContext.getConf(),
+          session.getSessionId(),
+          queryId,
+          scanNode,
+          Integer.MAX_VALUE,
+          Optional.empty());
       resultScanner.init();
       session.addNonForwardQueryResultScanner(resultScanner);
     }


Mime
View raw message