Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 57787200C0F for ; Thu, 2 Feb 2017 10:33:00 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 56041160B54; Thu, 2 Feb 2017 09:33:00 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id B17F5160B57 for ; Thu, 2 Feb 2017 10:32:56 +0100 (CET) Received: (qmail 78618 invoked by uid 500); 2 Feb 2017 09:32:55 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 78609 invoked by uid 99); 2 Feb 2017 09:32:55 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 02 Feb 2017 09:32:55 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 0BD81DFC12; Thu, 2 Feb 2017 09:32:55 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: vozerov@apache.org To: commits@ignite.apache.org Message-Id: <404646f6ae22494fba008ca58a12b192@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: ignite git commit: IGNITE-4582: Async API: IgniteFileSystem refactoring. Date: Thu, 2 Feb 2017 09:32:55 +0000 (UTC) archived-at: Thu, 02 Feb 2017 09:33:00 -0000 Repository: ignite Updated Branches: refs/heads/ignite-4475-async a5969cd3f -> 5cf6dc4d9 IGNITE-4582: Async API: IgniteFileSystem refactoring. Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/5cf6dc4d Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/5cf6dc4d Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/5cf6dc4d Branch: refs/heads/ignite-4475-async Commit: 5cf6dc4d9461eeae1f6eae4210e28094637e6c08 Parents: a5969cd Author: devozerov Authored: Thu Feb 2 12:32:38 2017 +0300 Committer: devozerov Committed: Thu Feb 2 12:32:38 2017 +0300 ---------------------------------------------------------------------- .../org/apache/ignite/IgniteFileSystem.java | 76 ++++++++++++++++++++ .../internal/processors/igfs/IgfsAsyncImpl.java | 43 +++++++++-- .../internal/processors/igfs/IgfsImpl.java | 62 +++++++++++++--- .../internal/processors/igfs/IgfsMock.java | 36 ++++++++++ .../processors/igfs/IgfsTaskSelfTest.java | 19 +++++ 5 files changed, 220 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/5cf6dc4d/modules/core/src/main/java/org/apache/ignite/IgniteFileSystem.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteFileSystem.java b/modules/core/src/main/java/org/apache/ignite/IgniteFileSystem.java index 8fb4fcd..78c86dc 100644 --- a/modules/core/src/main/java/org/apache/ignite/IgniteFileSystem.java +++ b/modules/core/src/main/java/org/apache/ignite/IgniteFileSystem.java @@ -32,6 +32,7 @@ import org.apache.ignite.igfs.mapreduce.IgfsRecordResolver; import org.apache.ignite.igfs.mapreduce.IgfsTask; import org.apache.ignite.lang.IgniteAsyncSupport; import org.apache.ignite.lang.IgniteAsyncSupported; +import org.apache.ignite.lang.IgniteFuture; import org.apache.ignite.lang.IgniteUuid; import org.jetbrains.annotations.Nullable; import org.apache.ignite.igfs.IgfsPathNotFoundException; @@ -275,6 +276,15 @@ public interface IgniteFileSystem extends IgniteAsyncSupport { public void format() throws IgniteException; /** + * Asynchronously formats the file system removing all existing entries from it. + *

+ * + * @return a Future representing pending completion of the format operation. + * @throws IgniteException In case format has failed. + */ + public IgniteFuture formatAsync() throws IgniteException; + + /** * Executes IGFS task. *

* Supports asynchronous execution (see {@link IgniteAsyncSupport}). @@ -291,6 +301,20 @@ public interface IgniteFileSystem extends IgniteAsyncSupport { Collection paths, @Nullable T arg) throws IgniteException; /** + * Executes IGFS task asynchronously. + * + * @param task Task to execute. + * @param rslvr Optional resolver to control split boundaries. + * @param paths Collection of paths to be processed within this task. + * @param arg Optional task argument. + * @return a Future representing pending completion of the task. + * @throws IgniteException If execution failed. + */ + public IgniteFuture executeAsync(IgfsTask task, @Nullable IgfsRecordResolver rslvr, + Collection paths, @Nullable T arg) throws IgniteException; + + + /** * Executes IGFS task with overridden maximum range length (see * {@link org.apache.ignite.configuration.FileSystemConfiguration#getMaximumTaskRangeLength()} for more information). *

@@ -313,6 +337,25 @@ public interface IgniteFileSystem extends IgniteAsyncSupport { throws IgniteException; /** + * Executes IGFS task asynchronously with overridden maximum range length (see + * {@link org.apache.ignite.configuration.FileSystemConfiguration#getMaximumTaskRangeLength()} for more information). + * + * @param task Task to execute. + * @param rslvr Optional resolver to control split boundaries. + * @param paths Collection of paths to be processed within this task. + * @param skipNonExistentFiles Whether to skip non existent files. If set to {@code true} non-existent files will + * be ignored. Otherwise an exception will be thrown. + * @param maxRangeLen Optional maximum range length. If {@code 0}, then by default all consecutive + * IGFS blocks will be included. + * @param arg Optional task argument. + * @return a Future representing pending completion of the task. + * @throws IgniteException If execution failed. + */ + public IgniteFuture executeAsync(IgfsTask task, @Nullable IgfsRecordResolver rslvr, + Collection paths, boolean skipNonExistentFiles, long maxRangeLen, @Nullable T arg) + throws IgniteException; + + /** * Executes IGFS task. *

* Supports asynchronous execution (see {@link IgniteAsyncSupport}). @@ -329,6 +372,20 @@ public interface IgniteFileSystem extends IgniteAsyncSupport { @Nullable IgfsRecordResolver rslvr, Collection paths, @Nullable T arg) throws IgniteException; /** + * Executes IGFS task asynchronously. + * + * @param taskCls Task class to execute. + * @param rslvr Optional resolver to control split boundaries. + * @param paths Collection of paths to be processed within this task. + * @param arg Optional task argument. + * @return a Future representing pending completion of the task. + * @throws IgniteException If execution failed. + */ + public IgniteFuture executeAsync(Class> taskCls, + @Nullable IgfsRecordResolver rslvr, Collection paths, @Nullable T arg) throws IgniteException; + + + /** * Executes IGFS task with overridden maximum range length (see * {@link org.apache.ignite.configuration.FileSystemConfiguration#getMaximumTaskRangeLength()} for more information). *

@@ -350,6 +407,24 @@ public interface IgniteFileSystem extends IgniteAsyncSupport { long maxRangeLen, @Nullable T arg) throws IgniteException; /** + * Executes IGFS task asynchronously with overridden maximum range length (see + * {@link org.apache.ignite.configuration.FileSystemConfiguration#getMaximumTaskRangeLength()} for more information). + * + * @param taskCls Task class to execute. + * @param rslvr Optional resolver to control split boundaries. + * @param paths Collection of paths to be processed within this task. + * @param skipNonExistentFiles Whether to skip non existent files. If set to {@code true} non-existent files will + * be ignored. Otherwise an exception will be thrown. + * @param maxRangeLen Maximum range length. + * @param arg Optional task argument. + * @return a Future representing pending completion of the task. + * @throws IgniteException If execution failed. + */ + public IgniteFuture executeAsync(Class> taskCls, + @Nullable IgfsRecordResolver rslvr, Collection paths, boolean skipNonExistentFiles, + long maxRangeLen, @Nullable T arg) throws IgniteException; + + /** * Checks if the specified path exists in the file system. * * @param path Path to check for existence in the file system. @@ -473,5 +548,6 @@ public interface IgniteFileSystem extends IgniteAsyncSupport { public long usedSpaceSize() throws IgniteException; /** {@inheritDoc} */ + @Deprecated @Override public IgniteFileSystem withAsync(); } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/5cf6dc4d/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsAsyncImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsAsyncImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsAsyncImpl.java index 106ef60..b5289a3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsAsyncImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsAsyncImpl.java @@ -21,6 +21,7 @@ import java.net.URI; import java.util.Collection; import java.util.Map; import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteFileSystem; import org.apache.ignite.configuration.FileSystemConfiguration; import org.apache.ignite.igfs.IgfsBlockLocation; @@ -36,6 +37,7 @@ import org.apache.ignite.igfs.mapreduce.IgfsTask; import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystem; import org.apache.ignite.internal.AsyncSupportAdapter; import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteFuture; import org.apache.ignite.lang.IgniteUuid; import org.jetbrains.annotations.Nullable; @@ -58,7 +60,7 @@ public class IgfsAsyncImpl extends AsyncSupportAdapter impleme /** {@inheritDoc} */ @Override public void format() { try { - saveOrGet(igfs.formatAsync()); + saveOrGet(igfs.formatAsync0()); } catch (IgniteCheckedException e) { throw U.convertException(e); @@ -66,10 +68,15 @@ public class IgfsAsyncImpl extends AsyncSupportAdapter impleme } /** {@inheritDoc} */ + @Override public IgniteFuture formatAsync() throws IgniteException { + return igfs.formatAsync(); + } + + /** {@inheritDoc} */ @Override public R execute(IgfsTask task, @Nullable IgfsRecordResolver rslvr, Collection paths, @Nullable T arg) { try { - return saveOrGet(igfs.executeAsync(task, rslvr, paths, arg)); + return saveOrGet(igfs.executeAsync0(task, rslvr, paths, arg)); } catch (IgniteCheckedException e) { throw U.convertException(e); @@ -77,10 +84,16 @@ public class IgfsAsyncImpl extends AsyncSupportAdapter impleme } /** {@inheritDoc} */ + @Override public IgniteFuture executeAsync(IgfsTask task, @Nullable IgfsRecordResolver rslvr, + Collection paths, @Nullable T arg) throws IgniteException { + return igfs.executeAsync(task, rslvr, paths, arg); + } + + /** {@inheritDoc} */ @Override public R execute(IgfsTask task, @Nullable IgfsRecordResolver rslvr, Collection paths, boolean skipNonExistentFiles, long maxRangeLen, @Nullable T arg) { try { - return saveOrGet(igfs.executeAsync(task, rslvr, paths, skipNonExistentFiles, maxRangeLen, arg)); + return saveOrGet(igfs.executeAsync0(task, rslvr, paths, skipNonExistentFiles, maxRangeLen, arg)); } catch (IgniteCheckedException e) { throw U.convertException(e); @@ -88,10 +101,17 @@ public class IgfsAsyncImpl extends AsyncSupportAdapter impleme } /** {@inheritDoc} */ + @Override public IgniteFuture executeAsync(IgfsTask task, @Nullable IgfsRecordResolver rslvr, + Collection paths, boolean skipNonExistentFiles, long maxRangeLen, + @Nullable T arg) throws IgniteException { + return igfs.executeAsync(task, rslvr, paths, skipNonExistentFiles, maxRangeLen, arg); + } + + /** {@inheritDoc} */ @Override public R execute(Class> taskCls, @Nullable IgfsRecordResolver rslvr, Collection paths, @Nullable T arg) { try { - return saveOrGet(igfs.executeAsync(taskCls, rslvr, paths, arg)); + return saveOrGet(igfs.executeAsync0(taskCls, rslvr, paths, arg)); } catch (IgniteCheckedException e) { throw U.convertException(e); @@ -99,11 +119,17 @@ public class IgfsAsyncImpl extends AsyncSupportAdapter impleme } /** {@inheritDoc} */ + @Override public IgniteFuture executeAsync(Class> taskCls, + @Nullable IgfsRecordResolver rslvr, Collection paths, @Nullable T arg) throws IgniteException { + return igfs.executeAsync(taskCls, rslvr, paths, arg); + } + + /** {@inheritDoc} */ @Override public R execute(Class> taskCls, @Nullable IgfsRecordResolver rslvr, Collection paths, boolean skipNonExistentFiles, long maxRangeLen, @Nullable T arg) { try { - return saveOrGet(igfs.executeAsync(taskCls, rslvr, paths, skipNonExistentFiles, maxRangeLen, arg)); + return saveOrGet(igfs.executeAsync0(taskCls, rslvr, paths, skipNonExistentFiles, maxRangeLen, arg)); } catch (IgniteCheckedException e) { throw U.convertException(e); @@ -111,6 +137,13 @@ public class IgfsAsyncImpl extends AsyncSupportAdapter impleme } /** {@inheritDoc} */ + @Override public IgniteFuture executeAsync(Class> taskCls, + @Nullable IgfsRecordResolver rslvr, Collection paths, boolean skipNonExistentFiles, long maxRangeLen, + @Nullable T arg) throws IgniteException { + return igfs.executeAsync(taskCls, rslvr, paths, skipNonExistentFiles, maxRangeLen, arg); + } + + /** {@inheritDoc} */ @Override public void stop(boolean cancel) { igfs.stop(cancel); } http://git-wip-us.apache.org/repos/asf/ignite/blob/5cf6dc4d/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java index 59674f8..e2664f8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java @@ -82,6 +82,7 @@ import org.apache.ignite.internal.processors.igfs.client.IgfsClientUpdateCallabl import org.apache.ignite.internal.processors.task.GridInternal; import org.apache.ignite.internal.util.GridSpinBusyLock; import org.apache.ignite.internal.util.future.GridFutureAdapter; +import org.apache.ignite.internal.util.future.IgniteFutureImpl; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.util.typedef.internal.A; @@ -1424,12 +1425,17 @@ public final class IgfsImpl implements IgfsEx { } } + /** {@inheritDoc} */ + @Override public IgniteFuture formatAsync() throws IgniteException { + return (IgniteFuture)createFuture(formatAsync0()); + } + /** * Formats the file system removing all existing entries from it. * * @return Future. */ - IgniteInternalFuture formatAsync() { + IgniteInternalFuture formatAsync0() { GridFutureAdapter fut = new GridFutureAdapter<>(); Thread t = new Thread(new FormatRunnable(fut), "igfs-format-" + cfg.getName() + "-" + @@ -1446,7 +1452,7 @@ public final class IgfsImpl implements IgfsEx { @Override public R execute(IgfsTask task, @Nullable IgfsRecordResolver rslvr, Collection paths, @Nullable T arg) { try { - return executeAsync(task, rslvr, paths, arg).get(); + return executeAsync0(task, rslvr, paths, arg).get(); } catch (Exception e) { throw IgfsUtils.toIgfsException(e); @@ -1454,10 +1460,16 @@ public final class IgfsImpl implements IgfsEx { } /** {@inheritDoc} */ + @Override public IgniteFuture executeAsync(IgfsTask task, @Nullable IgfsRecordResolver rslvr, + Collection paths, @Nullable T arg) throws IgniteException { + return createFuture(executeAsync0(task, rslvr, paths, arg)); + } + + /** {@inheritDoc} */ @Override public R execute(IgfsTask task, @Nullable IgfsRecordResolver rslvr, Collection paths, boolean skipNonExistentFiles, long maxRangeLen, @Nullable T arg) { try { - return executeAsync(task, rslvr, paths, skipNonExistentFiles, maxRangeLen, arg).get(); + return executeAsync0(task, rslvr, paths, skipNonExistentFiles, maxRangeLen, arg).get(); } catch (Exception e) { throw IgfsUtils.toIgfsException(e); @@ -1465,10 +1477,17 @@ public final class IgfsImpl implements IgfsEx { } /** {@inheritDoc} */ + @Override public IgniteFuture executeAsync(IgfsTask task, @Nullable IgfsRecordResolver rslvr, + Collection paths, boolean skipNonExistentFiles, long maxRangeLen, + @Nullable T arg) throws IgniteException { + return createFuture(executeAsync0(task, rslvr, paths, skipNonExistentFiles, maxRangeLen, arg)); + } + + /** {@inheritDoc} */ @Override public R execute(Class> taskCls, @Nullable IgfsRecordResolver rslvr, Collection paths, @Nullable T arg) { try { - return executeAsync(taskCls, rslvr, paths, arg).get(); + return executeAsync0(taskCls, rslvr, paths, arg).get(); } catch (Exception e) { throw IgfsUtils.toIgfsException(e); @@ -1476,17 +1495,30 @@ public final class IgfsImpl implements IgfsEx { } /** {@inheritDoc} */ + @Override public IgniteFuture executeAsync(Class> taskCls, + @Nullable IgfsRecordResolver rslvr, Collection paths, @Nullable T arg) throws IgniteException { + return createFuture(executeAsync0(taskCls, rslvr, paths, arg)); + } + + /** {@inheritDoc} */ @Override public R execute(Class> taskCls, @Nullable IgfsRecordResolver rslvr, Collection paths, boolean skipNonExistentFiles, long maxRangeSize, @Nullable T arg) { try { - return executeAsync(taskCls, rslvr, paths, skipNonExistentFiles, maxRangeSize, arg).get(); + return executeAsync0(taskCls, rslvr, paths, skipNonExistentFiles, maxRangeSize, arg).get(); } catch (Exception e) { throw IgfsUtils.toIgfsException(e); } } + /** {@inheritDoc} */ + @Override public IgniteFuture executeAsync(Class> taskCls, + @Nullable IgfsRecordResolver rslvr, Collection paths, boolean skipNonExistentFiles, long maxRangeLen, + @Nullable T arg) throws IgniteException { + return createFuture(executeAsync0(taskCls, rslvr, paths, skipNonExistentFiles, maxRangeLen, arg)); + } + /** * Executes IGFS task asynchronously. * @@ -1496,9 +1528,9 @@ public final class IgfsImpl implements IgfsEx { * @param arg Optional task argument. * @return Execution future. */ - IgniteInternalFuture executeAsync(IgfsTask task, @Nullable IgfsRecordResolver rslvr, + IgniteInternalFuture executeAsync0(IgfsTask task, @Nullable IgfsRecordResolver rslvr, Collection paths, @Nullable T arg) { - return executeAsync(task, rslvr, paths, true, cfg.getMaximumTaskRangeLength(), arg); + return executeAsync0(task, rslvr, paths, true, cfg.getMaximumTaskRangeLength(), arg); } /** @@ -1515,7 +1547,7 @@ public final class IgfsImpl implements IgfsEx { * @param arg Optional task argument. * @return Execution future. */ - IgniteInternalFuture executeAsync(IgfsTask task, @Nullable IgfsRecordResolver rslvr, + IgniteInternalFuture executeAsync0(IgfsTask task, @Nullable IgfsRecordResolver rslvr, Collection paths, boolean skipNonExistentFiles, long maxRangeLen, @Nullable T arg) { return igfsCtx.kernalContext().task().execute(task, new IgfsTaskArgsImpl<>(cfg.getName(), paths, rslvr, skipNonExistentFiles, maxRangeLen, arg)); @@ -1530,9 +1562,9 @@ public final class IgfsImpl implements IgfsEx { * @param arg Optional task argument. * @return Execution future. */ - IgniteInternalFuture executeAsync(Class> taskCls, + IgniteInternalFuture executeAsync0(Class> taskCls, @Nullable IgfsRecordResolver rslvr, Collection paths, @Nullable T arg) { - return executeAsync(taskCls, rslvr, paths, true, cfg.getMaximumTaskRangeLength(), arg); + return executeAsync0(taskCls, rslvr, paths, true, cfg.getMaximumTaskRangeLength(), arg); } /** @@ -1549,7 +1581,7 @@ public final class IgfsImpl implements IgfsEx { * @return Execution future. */ @SuppressWarnings("unchecked") - IgniteInternalFuture executeAsync(Class> taskCls, + IgniteInternalFuture executeAsync0(Class> taskCls, @Nullable IgfsRecordResolver rslvr, Collection paths, boolean skipNonExistentFiles, long maxRangeLen, @Nullable T arg) { return igfsCtx.kernalContext().task().execute((Class>)taskCls, @@ -1774,6 +1806,14 @@ public final class IgfsImpl implements IgfsEx { } /** + * @param fut Internal future. + * @return Public API future. + */ + private IgniteFuture createFuture(IgniteInternalFuture fut) { + return new IgniteFutureImpl<>(fut); + } + + /** * IGFS thread factory. */ @SuppressWarnings("NullableProblems") http://git-wip-us.apache.org/repos/asf/ignite/blob/5cf6dc4d/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsMock.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsMock.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsMock.java index 04c67dc..78fbf3b 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsMock.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsMock.java @@ -265,6 +265,12 @@ public class IgfsMock implements IgfsEx { throwUnsupported(); } + @Override public IgniteFuture formatAsync() throws IgniteException { + throwUnsupported(); + + return null; + } + /** {@inheritDoc} */ @Override public R execute(IgfsTask task, @Nullable IgfsRecordResolver rslvr, Collection paths, @Nullable T arg) throws IgniteException { @@ -273,6 +279,13 @@ public class IgfsMock implements IgfsEx { return null; } + @Override public IgniteFuture executeAsync(IgfsTask task, @Nullable IgfsRecordResolver rslvr, + Collection paths, @Nullable T arg) throws IgniteException { + throwUnsupported(); + + return null; + } + /** {@inheritDoc} */ @Override public R execute(IgfsTask task, @Nullable IgfsRecordResolver rslvr, Collection paths, boolean skipNonExistentFiles, long maxRangeLen, @Nullable T arg) @@ -282,6 +295,14 @@ public class IgfsMock implements IgfsEx { return null; } + @Override public IgniteFuture executeAsync(IgfsTask task, @Nullable IgfsRecordResolver rslvr, + Collection paths, boolean skipNonExistentFiles, long maxRangeLen, + @Nullable T arg) throws IgniteException { + throwUnsupported(); + + return null; + } + /** {@inheritDoc} */ @Override public R execute(Class> taskCls, @Nullable IgfsRecordResolver rslvr, Collection paths, @Nullable T arg) throws IgniteException { @@ -290,6 +311,13 @@ public class IgfsMock implements IgfsEx { return null; } + @Override public IgniteFuture executeAsync(Class> taskCls, + @Nullable IgfsRecordResolver rslvr, Collection paths, @Nullable T arg) throws IgniteException { + throwUnsupported(); + + return null; + } + /** {@inheritDoc} */ @Override public R execute(Class> taskCls, @Nullable IgfsRecordResolver rslvr, Collection paths, boolean skipNonExistentFiles, long maxRangeLen, @Nullable T arg) @@ -299,6 +327,14 @@ public class IgfsMock implements IgfsEx { return null; } + @Override public IgniteFuture executeAsync(Class> taskCls, + @Nullable IgfsRecordResolver rslvr, Collection paths, boolean skipNonExistentFiles, long maxRangeLen, + @Nullable T arg) throws IgniteException { + throwUnsupported(); + + return null; + } + /** {@inheritDoc} */ @Override public boolean exists(IgfsPath path) { throwUnsupported(); http://git-wip-us.apache.org/repos/asf/ignite/blob/5cf6dc4d/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsTaskSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsTaskSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsTaskSelfTest.java index 7b972c3..b87879a 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsTaskSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsTaskSelfTest.java @@ -170,6 +170,25 @@ public class IgfsTaskSelfTest extends IgfsCommonAbstractTest { } /** + * Test task. + * + * @throws Exception If failed. + */ + @SuppressWarnings("ConstantConditions") + public void testTaskAsync() throws Exception { + String arg = DICTIONARY[new Random(System.currentTimeMillis()).nextInt(DICTIONARY.length)]; + + generateFile(TOTAL_WORDS); + Long genLen = igfs.info(FILE).length(); + + IgniteBiTuple taskRes = igfs.executeAsync(new Task(), + new IgfsStringDelimiterRecordResolver(" "), Collections.singleton(FILE), arg).get(); + + assert F.eq(genLen, taskRes.getKey()); + assert F.eq(TOTAL_WORDS, taskRes.getValue()); + } + + /** * Generate file with random data and provided argument. * * @param wordCnt Word count.