ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [50/50] [abbrv] incubator-ignite git commit: Merge branch 'sprint-2' into ignite-394
Date Tue, 10 Mar 2015 10:08:23 GMT
Merge branch 'sprint-2' into ignite-394

Conflicts:
	modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/IgniteDataStreamerFuture.java
	modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/IgniteDataStreamerImpl.java


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/1de65eb0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/1de65eb0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/1de65eb0

Branch: refs/heads/ignite-394
Commit: 1de65eb06834f2df87b373bdd9e55371bb5bcbac
Parents: 1f2b802 40b96e1
Author: Artem Shutak <ashutak@gridgain.com>
Authored: Tue Mar 10 13:05:10 2015 +0300
Committer: Artem Shutak <ashutak@gridgain.com>
Committed: Tue Mar 10 13:05:10 2015 +0300

----------------------------------------------------------------------
 DISCLAIMER.txt                                  |   15 +
 README.txt                                      |   25 +
 RELEASE_NOTES.txt                               |   18 +
 assembly/release-base.xml                       |   20 +-
 assembly/release-fabric.xml                     |    5 -
 bin/ignitevisorcmd.bat                          |    7 +-
 bin/ignitevisorcmd.sh                           |    7 +-
 bin/include/target-classpath.bat                |    2 +-
 bin/include/target-classpath.sh                 |    2 +-
 docs/ignite_readme.md                           |  100 --
 docs/ignite_readme.pdf                          |  Bin 77136 -> 0 bytes
 docs/release_notes.md                           |   16 -
 docs/release_notes.pdf                          |  Bin 33174 -> 0 bytes
 docs/wiki/basic-concepts/async-support.md       |   75 -
 docs/wiki/basic-concepts/getting-started.md     |  218 ---
 docs/wiki/basic-concepts/ignite-life-cycel.md   |  105 --
 docs/wiki/basic-concepts/maven-setup.md         |   68 -
 docs/wiki/basic-concepts/what-is-ignite.md      |   31 -
 docs/wiki/basic-concepts/zero-deployment.md     |   56 -
 docs/wiki/clustering/aws-config.md              |   42 -
 docs/wiki/clustering/cluster-config.md          |  176 --
 docs/wiki/clustering/cluster-groups.md          |  210 ---
 docs/wiki/clustering/cluster.md                 |  128 --
 docs/wiki/clustering/leader-election.md         |   59 -
 docs/wiki/clustering/network-config.md          |  101 --
 docs/wiki/clustering/node-local-map.md          |   35 -
 docs/wiki/compute-grid/checkpointing.md         |  238 ---
 .../compute-grid/collocate-compute-and-data.md  |   29 -
 docs/wiki/compute-grid/compute-grid.md          |   56 -
 docs/wiki/compute-grid/compute-tasks.md         |  105 --
 docs/wiki/compute-grid/distributed-closures.md  |  107 --
 docs/wiki/compute-grid/executor-service.md      |   23 -
 docs/wiki/compute-grid/fault-tolerance.md       |   79 -
 docs/wiki/compute-grid/job-scheduling.md        |   69 -
 docs/wiki/compute-grid/load-balancing.md        |   59 -
 docs/wiki/data-grid/affinity-collocation.md     |   78 -
 docs/wiki/data-grid/automatic-db-integration.md |  102 --
 docs/wiki/data-grid/cache-modes.md              |  237 ---
 docs/wiki/data-grid/cache-queries.md            |  164 --
 docs/wiki/data-grid/data-grid.md                |   68 -
 docs/wiki/data-grid/data-loading.md             |   77 -
 docs/wiki/data-grid/evictions.md                |   86 -
 docs/wiki/data-grid/hibernate-l2-cache.md       |  173 --
 docs/wiki/data-grid/jcache.md                   |   99 --
 docs/wiki/data-grid/off-heap-memory.md          |  180 --
 docs/wiki/data-grid/persistent-store.md         |  111 --
 docs/wiki/data-grid/rebalancing.md              |  105 --
 docs/wiki/data-grid/transactions.md             |  127 --
 docs/wiki/data-grid/web-session-clustering.md   |  236 ---
 .../distributed-data-structures/atomic-types.md |   97 -
 .../countdownlatch.md                           |   24 -
 .../distributed-data-structures/id-generator.md |   40 -
 .../queue-and-set.md                            |  116 --
 .../distributed-events/automatic-batching.md    |   16 -
 docs/wiki/distributed-events/events.md          |  101 --
 docs/wiki/distributed-file-system/igfs.md       |    1 -
 docs/wiki/distributed-messaging/messaging.md    |   73 -
 docs/wiki/http/configuration.md                 |   58 -
 docs/wiki/http/rest-api.md                      | 1646 -----------------
 docs/wiki/release-notes/release-notes.md        |   13 -
 docs/wiki/service-grid/cluster-singletons.md    |   94 -
 docs/wiki/service-grid/service-configuration.md |   33 -
 docs/wiki/service-grid/service-example.md       |   94 -
 docs/wiki/service-grid/service-grid.md          |   62 -
 .../ComputeFibonacciContinuationExample.java    |   12 +-
 .../examples/datagrid/CacheApiExample.java      |    2 +-
 .../examples/ScalarContinuationExample.scala    |   12 +-
 .../client/suite/IgniteClientTestSuite.java     |    3 +-
 .../org/apache/ignite/IgniteFileSystem.java     |   24 +-
 .../apache/ignite/IgniteSystemProperties.java   |   35 +-
 .../org/apache/ignite/cache/CacheManager.java   |    1 -
 .../igfs/IgfsDirectoryNotEmptyException.java    |   42 +
 .../ignite/igfs/IgfsFileNotFoundException.java  |   44 -
 .../ignite/igfs/IgfsPathNotFoundException.java  |   44 +
 .../igfs/secondary/IgfsSecondaryFileSystem.java |   10 +-
 .../internal/ComputeTaskInternalFuture.java     |   44 +-
 .../ignite/internal/GridJobContextImpl.java     |    6 +-
 .../ignite/internal/GridJobSessionImpl.java     |    2 +-
 .../ignite/internal/GridKernalGatewayImpl.java  |   26 -
 .../ignite/internal/GridTaskSessionImpl.java    |    2 +-
 .../ignite/internal/IgniteInternalFuture.java   |   79 +-
 .../apache/ignite/internal/IgniteKernal.java    |   24 +-
 .../internal/client/GridClientFuture.java       |    9 +-
 .../client/impl/GridClientDataImpl.java         |    2 +-
 .../client/impl/GridClientFutureAdapter.java    |   26 +-
 .../connection/GridClientNioTcpConnection.java  |    5 +-
 .../impl/GridTcpRouterNioListenerAdapter.java   |    2 +-
 .../internal/cluster/IgniteClusterImpl.java     |    9 +-
 .../internal/executor/GridExecutorService.java  |    2 +-
 .../igfs/common/IgfsControlResponse.java        |    5 +-
 .../managers/communication/GridIoManager.java   |   43 +-
 .../discovery/GridDiscoveryManager.java         |   10 +-
 .../eventstorage/GridEventStorageManager.java   |    6 +-
 .../affinity/GridAffinityAssignmentCache.java   |   17 +-
 .../affinity/GridAffinityProcessor.java         |    2 +-
 .../processors/cache/GridCacheAdapter.java      |   91 +-
 .../cache/GridCacheAffinityManager.java         |    2 +-
 .../cache/GridCacheDeploymentManager.java       |    2 +-
 .../cache/GridCacheEvictionManager.java         |   27 +-
 .../processors/cache/GridCacheGateway.java      |    6 +
 .../processors/cache/GridCacheIoManager.java    |   77 +-
 .../processors/cache/GridCacheMapEntry.java     |   14 +-
 .../processors/cache/GridCacheMessage.java      |    7 -
 .../cache/GridCacheMultiTxFuture.java           |   54 +-
 .../processors/cache/GridCacheMvcc.java         |    3 +-
 .../processors/cache/GridCacheMvccManager.java  |   34 +-
 .../GridCachePartitionExchangeManager.java      |    6 +-
 .../cache/GridCachePreloaderAdapter.java        |    4 +-
 .../processors/cache/GridCacheProcessor.java    |    2 +-
 .../cache/GridCacheProjectionImpl.java          |   12 +-
 .../cache/GridCacheSharedContext.java           |    2 +-
 .../processors/cache/GridCacheUtils.java        |   11 +-
 .../processors/cache/IgniteCacheProxy.java      |    2 +-
 ...ridCacheOptimisticCheckPreparedTxFuture.java |   25 +-
 .../distributed/GridCacheTxFinishSync.java      |    2 +-
 .../GridDistributedCacheAdapter.java            |    4 +-
 .../GridDistributedTxRemoteAdapter.java         |    6 +-
 .../dht/GridDhtAssignmentFetchFuture.java       |   22 +-
 .../distributed/dht/GridDhtCacheAdapter.java    |    9 +-
 .../distributed/dht/GridDhtCacheEntry.java      |   14 +-
 .../distributed/dht/GridDhtEmbeddedFuture.java  |   43 +-
 .../distributed/dht/GridDhtFinishedFuture.java  |   22 +-
 .../cache/distributed/dht/GridDhtGetFuture.java |   45 +-
 .../distributed/dht/GridDhtLocalPartition.java  |    4 +-
 .../distributed/dht/GridDhtLockFuture.java      |   35 +-
 .../dht/GridDhtTransactionalCacheAdapter.java   |   35 +-
 .../distributed/dht/GridDhtTxFinishFuture.java  |   38 +-
 .../cache/distributed/dht/GridDhtTxLocal.java   |    4 +-
 .../distributed/dht/GridDhtTxLocalAdapter.java  |   16 +-
 .../distributed/dht/GridDhtTxPrepareFuture.java |   48 +-
 .../dht/GridPartitionedGetFuture.java           |   44 +-
 .../dht/atomic/GridDhtAtomicCache.java          |   20 +-
 .../dht/atomic/GridDhtAtomicUpdateFuture.java   |   20 +-
 .../dht/atomic/GridNearAtomicUpdateFuture.java  |   43 +-
 .../dht/colocated/GridDhtColocatedCache.java    |   30 +-
 .../colocated/GridDhtColocatedLockFuture.java   |   43 +-
 .../dht/preloader/GridDhtForceKeysFuture.java   |   43 +-
 .../preloader/GridDhtPartitionDemandPool.java   |    9 +-
 .../GridDhtPartitionsExchangeFuture.java        |   48 +-
 .../dht/preloader/GridDhtPreloader.java         |   22 +-
 .../distributed/near/GridNearAtomicCache.java   |    2 +-
 .../distributed/near/GridNearCacheAdapter.java  |    6 +-
 .../distributed/near/GridNearGetFuture.java     |   51 +-
 .../distributed/near/GridNearLockFuture.java    |   43 +-
 .../near/GridNearTransactionalCache.java        |    2 +-
 .../near/GridNearTxFinishFuture.java            |   32 +-
 .../cache/distributed/near/GridNearTxLocal.java |   83 +-
 .../near/GridNearTxPrepareFuture.java           |   48 +-
 .../processors/cache/local/GridLocalCache.java  |    2 +-
 .../cache/local/GridLocalLockFuture.java        |   23 +-
 .../processors/cache/local/GridLocalTx.java     |   10 +-
 .../cache/local/GridLocalTxFuture.java          |   59 +-
 .../local/atomic/GridLocalAtomicCache.java      |    8 +-
 .../GridCacheDistributedFieldsQueryFuture.java  |   13 +-
 .../query/GridCacheDistributedQueryFuture.java  |   11 -
 .../query/GridCacheDistributedQueryManager.java |    4 +-
 .../query/GridCacheFieldsQueryErrorFuture.java  |   53 -
 .../query/GridCacheLocalFieldsQueryFuture.java  |   13 +-
 .../cache/query/GridCacheLocalQueryFuture.java  |   15 +-
 .../cache/query/GridCacheQueryErrorFuture.java  |    5 +-
 .../query/GridCacheQueryFutureAdapter.java      |   14 +-
 .../cache/query/GridCacheQueryManager.java      |   11 +-
 .../cache/transactions/IgniteTxAdapter.java     |    2 +-
 .../cache/transactions/IgniteTxHandler.java     |   25 +-
 .../transactions/IgniteTxLocalAdapter.java      |  320 ++--
 .../cache/transactions/IgniteTxManager.java     |   18 +-
 .../transactions/TransactionProxyImpl.java      |    2 +-
 .../closure/GridClosureProcessor.java           |   38 +-
 .../continuous/GridContinuousProcessor.java     |   36 +-
 .../datastream/IgniteDataStreamerFuture.java    |   13 +-
 .../datastream/IgniteDataStreamerImpl.java      |   39 +-
 .../datastream/IgniteDataStreamerProcessor.java |    2 +-
 .../GridCacheAtomicSequenceImpl.java            |    4 +-
 .../processors/hadoop/HadoopNoopProcessor.java  |    2 +-
 .../processors/igfs/IgfsDataManager.java        |   36 +-
 .../processors/igfs/IgfsDeleteWorker.java       |    2 +-
 .../igfs/IgfsDirectoryNotEmptyException.java    |   44 -
 .../internal/processors/igfs/IgfsImpl.java      |   30 +-
 .../processors/igfs/IgfsInputStreamImpl.java    |   10 +-
 .../processors/igfs/IgfsIpcHandler.java         |    4 +-
 .../processors/igfs/IgfsMetaManager.java        |   22 +-
 .../processors/igfs/IgfsOutputStreamImpl.java   |    2 +-
 .../internal/processors/igfs/IgfsServer.java    |    3 +-
 .../processors/job/GridJobProcessor.java        |    4 +-
 .../processors/query/GridQueryProcessor.java    |    6 +-
 .../processors/resource/GridResourceUtils.java  |    4 +-
 .../processors/rest/GridRestProcessor.java      |   14 +-
 .../handlers/cache/GridCacheCommandHandler.java |    6 +-
 .../cache/GridCacheQueryCommandHandler.java     |    4 +-
 .../DataStructuresCommandHandler.java           |    4 +-
 .../handlers/task/GridTaskCommandHandler.java   |    6 +-
 .../top/GridTopologyCommandHandler.java         |    4 +-
 .../version/GridVersionCommandHandler.java      |    2 +-
 .../tcp/GridTcpMemcachedNioListener.java        |    6 +-
 .../protocols/tcp/GridTcpRestNioListener.java   |    6 +-
 .../service/GridServiceDeploymentFuture.java    |    9 +-
 .../service/GridServiceProcessor.java           |   10 +-
 .../GridStreamerStageExecutionFuture.java       |   32 +-
 .../processors/streamer/IgniteStreamerImpl.java |   23 +-
 .../internal/util/GridSerializableFuture.java   |   28 -
 .../ignite/internal/util/GridThreadLocal.java   |  175 --
 .../ignite/internal/util/GridThreadLocalEx.java |  210 ---
 .../ignite/internal/util/IgniteUtils.java       |   36 +-
 .../util/future/GridCompoundFuture.java         |   52 +-
 .../util/future/GridCompoundIdentityFuture.java |   18 +-
 .../util/future/GridEmbeddedFuture.java         |   77 +-
 .../util/future/GridFinishedFuture.java         |  158 +-
 .../util/future/GridFinishedFutureEx.java       |  197 ---
 .../internal/util/future/GridFutureAdapter.java |  365 +---
 .../util/future/GridFutureAdapterEx.java        |  517 ------
 .../util/future/GridFutureChainListener.java    |   18 +-
 .../util/future/IgniteFinishedFutureImpl.java   |   27 +-
 .../util/future/IgniteFinishedFutureImplEx.java |   30 -
 .../internal/util/future/IgniteFutureImpl.java  |   31 +-
 .../internal/util/io/GridFilenameUtils.java     |    2 +-
 .../ignite/internal/util/lang/GridFunc.java     |   90 +-
 .../internal/util/lang/GridPlainFuture.java     |   79 -
 .../util/lang/GridPlainFutureAdapter.java       |  299 ----
 .../util/nio/GridNioEmbeddedFuture.java         |   12 +-
 .../util/nio/GridNioFinishedFuture.java         |   77 +-
 .../ignite/internal/util/nio/GridNioFuture.java |   84 +-
 .../internal/util/nio/GridNioFutureImpl.java    |  282 +--
 .../ignite/internal/util/nio/GridNioServer.java |    7 +-
 .../util/nio/GridTcpNioCommunicationClient.java |   18 +-
 .../ignite/internal/util/worker/GridWorker.java |   27 -
 .../internal/util/worker/GridWorkerFuture.java  |   20 -
 .../visor/cache/VisorCacheClearTask.java        |    2 +-
 .../visor/node/VisorGridConfiguration.java      |    6 +-
 .../org/apache/ignite/lang/IgniteFuture.java    |   67 +-
 .../lang/IgniteFutureCancelledException.java    |    3 -
 .../lang/IgniteFutureTimeoutException.java      |    3 -
 .../communication/tcp/TcpCommunicationSpi.java  |   32 +-
 .../spi/discovery/tcp/TcpDiscoverySpi.java      |    2 +-
 .../igfs/IgfsFragmentizerAbstractSelfTest.java  |    4 +-
 .../internal/GridMultipleJobsSelfTest.java      |    2 +-
 .../GridTaskFutureImplStopGridSelfTest.java     |    2 +-
 .../internal/GridTaskListenerSelfTest.java      |    2 +-
 .../GridCacheAsyncOperationsLimitSelfTest.java  |    3 +-
 ...dCacheAtomicUsersAffinityMapperSelfTest.java |    7 +-
 .../GridCacheFinishPartitionsSelfTest.java      |    6 +-
 .../GridCachePartitionedLocalStoreSelfTest.java |    7 -
 ...chePartitionedOffHeapLocalStoreSelfTest.java |    7 -
 .../cache/GridCachePutAllFailoverSelfTest.java  |    6 +-
 .../GridCacheReferenceCleanupSelfTest.java      |    2 +-
 .../GridCacheReplicatedLocalStoreSelfTest.java  |    7 -
 ...heReplicatedUsersAffinityMapperSelfTest.java |    7 +-
 ...ridCacheTxPartitionedLocalStoreSelfTest.java |    7 -
 .../GridCacheTxUsersAffinityMapperSelfTest.java |    7 +-
 .../distributed/GridCacheEventAbstractTest.java |   17 +-
 .../processors/igfs/IgfsAbstractSelfTest.java   |   28 +-
 .../igfs/IgfsDualAbstractSelfTest.java          |   18 +-
 .../cache/GridCacheCommandHandlerSelfTest.java  |   42 +-
 .../util/future/GridCompoundFutureSelfTest.java |   30 +-
 .../util/future/GridEmbeddedFutureSelfTest.java |   13 +-
 .../util/future/GridFinishedFutureSelfTest.java |  103 --
 .../util/future/GridFutureAdapterSelfTest.java  |  115 +-
 .../future/GridFutureListenPerformanceTest.java |   22 +-
 .../util/future/IgniteFutureImplTest.java       |   99 +-
 .../util/future/nio/GridNioFutureSelfTest.java  |    8 +-
 .../lang/GridFutureListenPerformanceTest.java   |    2 +-
 .../loadtests/colocation/GridTestMain.java      |    2 +-
 ...GridJobExecutionLoadTestClientSemaphore.java |    2 +-
 ...JobExecutionSingleNodeSemaphoreLoadTest.java |    2 +-
 .../mergesort/GridMergeSortLoadTask.java        |    2 +-
 .../ignite/messaging/GridMessagingSelfTest.java |   12 +-
 .../GridCacheStoreValueBytesTest.java           |    4 +-
 .../ignite/testframework/GridTestUtils.java     |   13 +-
 .../testsuites/IgniteLangSelfTestSuite.java     |    1 -
 .../fs/IgniteHadoopIgfsSecondaryFileSystem.java |   10 +-
 .../processors/hadoop/igfs/HadoopIgfsEx.java    |    4 +-
 .../hadoop/igfs/HadoopIgfsFuture.java           |    4 +-
 .../hadoop/igfs/HadoopIgfsInProc.java           |    9 +-
 .../hadoop/igfs/HadoopIgfsInputStream.java      |    6 +-
 .../processors/hadoop/igfs/HadoopIgfsIo.java    |    6 +-
 .../processors/hadoop/igfs/HadoopIgfsIpcIo.java |    5 +-
 .../hadoop/igfs/HadoopIgfsOutProc.java          |   50 +-
 .../processors/hadoop/igfs/HadoopIgfsUtils.java |    3 +-
 .../hadoop/jobtracker/HadoopJobTracker.java     |   20 +-
 .../proto/HadoopProtocolJobStatusTask.java      |    2 +-
 .../hadoop/shuffle/HadoopShuffle.java           |    4 +-
 .../hadoop/shuffle/HadoopShuffleJob.java        |   16 +-
 .../external/HadoopExternalTaskExecutor.java    |   28 +-
 .../child/HadoopChildProcessRunner.java         |   10 +-
 .../HadoopExternalCommunication.java            |    7 +-
 .../HadoopTcpNioCommunicationClient.java        |   12 +-
 .../java/org/apache/ignite/igfs/IgfsLoad.java   |  549 ------
 .../ignite/loadtests/igfs/IgfsNodeStartup.java  |   48 -
 .../igfs/IgfsPerformanceBenchmark.java          |  281 ---
 .../processors/query/h2/IgniteH2Indexing.java   |    2 +-
 .../h2/twostep/GridReduceQueryExecutor.java     |    8 +-
 .../cache/jta/GridCacheXAResource.java          |    9 +-
 .../processors/schedule/ScheduleFutureImpl.java |  205 +--
 .../schedule/GridScheduleSelfTest.java          |    4 +-
 .../ignite/visor/commands/VisorConsole.scala    |   61 +-
 .../visor/commands/ack/VisorAckCommand.scala    |    4 +-
 .../visor/commands/gc/VisorGcCommand.scala      |    2 -
 .../visor/commands/ping/VisorPingCommand.scala  |    2 +-
 .../scala/org/apache/ignite/visor/visor.scala   |   34 +-
 pom.xml                                         |   33 +
 .../basic-concepts/async-support.md             |   92 +
 .../basic-concepts/getting-started.md           |  235 +++
 .../basic-concepts/ignite-life-cycel.md         |  122 ++
 .../documentation/basic-concepts/maven-setup.md |   85 +
 .../basic-concepts/what-is-ignite.md            |   48 +
 .../basic-concepts/zero-deployment.md           |   73 +
 wiki/documentation/clustering/aws-config.md     |   59 +
 wiki/documentation/clustering/cluster-config.md |  193 ++
 wiki/documentation/clustering/cluster-groups.md |  227 +++
 wiki/documentation/clustering/cluster.md        |  145 ++
 .../documentation/clustering/leader-election.md |   76 +
 wiki/documentation/clustering/network-config.md |  118 ++
 wiki/documentation/clustering/node-local-map.md |   52 +
 .../documentation/compute-grid/checkpointing.md |  255 +++
 .../compute-grid/collocate-compute-and-data.md  |   46 +
 wiki/documentation/compute-grid/compute-grid.md |   73 +
 .../documentation/compute-grid/compute-tasks.md |  122 ++
 .../compute-grid/distributed-closures.md        |  124 ++
 .../compute-grid/executor-service.md            |   40 +
 .../compute-grid/fault-tolerance.md             |   96 +
 .../compute-grid/job-scheduling.md              |   86 +
 .../compute-grid/load-balancing.md              |   76 +
 .../data-grid/affinity-collocation.md           |   95 +
 .../data-grid/automatic-db-integration.md       |  119 ++
 wiki/documentation/data-grid/cache-modes.md     |  254 +++
 wiki/documentation/data-grid/cache-queries.md   |  181 ++
 wiki/documentation/data-grid/data-grid.md       |   85 +
 wiki/documentation/data-grid/data-loading.md    |   94 +
 wiki/documentation/data-grid/evictions.md       |  103 ++
 .../data-grid/hibernate-l2-cache.md             |  190 ++
 wiki/documentation/data-grid/jcache.md          |  116 ++
 wiki/documentation/data-grid/off-heap-memory.md |  197 +++
 .../documentation/data-grid/persistent-store.md |  128 ++
 wiki/documentation/data-grid/rebalancing.md     |  122 ++
 wiki/documentation/data-grid/transactions.md    |  144 ++
 .../data-grid/web-session-clustering.md         |  253 +++
 .../distributed-data-structures/atomic-types.md |  114 ++
 .../countdownlatch.md                           |   41 +
 .../distributed-data-structures/id-generator.md |   57 +
 .../queue-and-set.md                            |  133 ++
 .../distributed-events/automatic-batching.md    |   33 +
 wiki/documentation/distributed-events/events.md |  118 ++
 .../distributed-file-system/igfs.md             |   18 +
 .../distributed-messaging/messaging.md          |   90 +
 wiki/documentation/http/configuration.md        |   67 +
 wiki/documentation/http/rest-api.md             | 1663 ++++++++++++++++++
 .../release-notes/release-notes.md              |   30 +
 .../service-grid/cluster-singletons.md          |  111 ++
 .../service-grid/service-configuration.md       |   50 +
 .../service-grid/service-example.md             |  111 ++
 wiki/documentation/service-grid/service-grid.md |   79 +
 wiki/licence-prepender.sh                       |   51 +
 351 files changed, 9001 insertions(+), 12888 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1de65eb0/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1de65eb0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1de65eb0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1de65eb0/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/IgniteDataStreamerFuture.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/IgniteDataStreamerFuture.java
index b6aa15c,0000000..8c29898
mode 100644,000000..100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/IgniteDataStreamerFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/IgniteDataStreamerFuture.java
@@@ -1,75 -1,0 +1,66 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *      http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.ignite.internal.processors.datastream;
 +
 +import org.apache.ignite.*;
 +import org.apache.ignite.internal.*;
 +import org.apache.ignite.internal.util.future.*;
 +import org.apache.ignite.internal.util.tostring.*;
 +import org.apache.ignite.internal.util.typedef.internal.*;
 +
- import java.io.*;
- 
 +/**
 + * Data streamer future.
 + */
 +class IgniteDataStreamerFuture extends GridFutureAdapter<Object> {
-     /** */
-     private static final long serialVersionUID = 0L;
- 
-     /** Data streamer. */
++    /** Data loader. */
 +    @GridToStringExclude
 +    private IgniteDataStreamerImpl dataLdr;
 +
 +    /**
 +     * Default constructor for {@link Externalizable} support.
 +     */
 +    public IgniteDataStreamerFuture() {
 +        // No-op.
 +    }
 +
 +    /**
 +     * @param ctx Context.
 +     * @param dataLdr Data streamer.
 +     */
-     IgniteDataStreamerFuture(GridKernalContext ctx, IgniteDataStreamerImpl dataLdr) {
-         super(ctx);
- 
++    IgniteDataStreamerFuture(GridKernalContext ctx, IgniteDataLoaderImpl dataLdr) {
 +        assert dataLdr != null;
 +
 +        this.dataLdr = dataLdr;
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public boolean cancel() throws IgniteCheckedException {
-         checkValid();
- 
 +        if (onCancelled()) {
 +            dataLdr.closeEx(true);
 +
 +            return true;
 +        }
 +
 +        return false;
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public String toString() {
 +        return S.toString(IgniteDataStreamerFuture.class, this, super.toString());
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1de65eb0/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/IgniteDataStreamerImpl.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/IgniteDataStreamerImpl.java
index faba034,0000000..7867c28
mode 100644,000000..100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/IgniteDataStreamerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/IgniteDataStreamerImpl.java
@@@ -1,1469 -1,0 +1,1470 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *      http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.ignite.internal.processors.datastream;
 +
 +import org.apache.ignite.*;
 +import org.apache.ignite.cluster.*;
 +import org.apache.ignite.events.*;
 +import org.apache.ignite.internal.*;
 +import org.apache.ignite.internal.cluster.*;
 +import org.apache.ignite.internal.managers.communication.*;
 +import org.apache.ignite.internal.managers.deployment.*;
 +import org.apache.ignite.internal.managers.eventstorage.*;
 +import org.apache.ignite.internal.processors.affinity.*;
 +import org.apache.ignite.internal.processors.cache.*;
 +import org.apache.ignite.internal.processors.cache.distributed.dht.*;
 +import org.apache.ignite.internal.processors.cache.version.*;
 +import org.apache.ignite.internal.processors.dr.*;
 +import org.apache.ignite.internal.processors.portable.*;
 +import org.apache.ignite.internal.util.*;
 +import org.apache.ignite.internal.util.future.*;
 +import org.apache.ignite.internal.util.lang.*;
 +import org.apache.ignite.internal.util.tostring.*;
 +import org.apache.ignite.internal.util.typedef.*;
 +import org.apache.ignite.internal.util.typedef.internal.*;
 +import org.apache.ignite.lang.*;
 +import org.jdk8.backport.*;
 +import org.jetbrains.annotations.*;
 +
 +import java.io.*;
 +import java.util.*;
 +import java.util.Map.*;
 +import java.util.concurrent.*;
 +import java.util.concurrent.atomic.*;
 +
 +import static org.apache.ignite.events.EventType.*;
 +import static org.apache.ignite.internal.GridTopic.*;
 +import static org.apache.ignite.internal.managers.communication.GridIoPolicy.*;
 +
 +/**
 + * Data streamer implementation.
 + */
 +@SuppressWarnings("unchecked")
 +public class IgniteDataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed {
 +    /** Isolated updater. */
 +    private static final Updater ISOLATED_UPDATER = new IsolatedUpdater();
 +
 +    /** Cache updater. */
 +    private Updater<K, V> updater = ISOLATED_UPDATER;
 +
 +    /** */
 +    private byte[] updaterBytes;
 +
 +    /** Max remap count before issuing an error. */
 +    private static final int DFLT_MAX_REMAP_CNT = 32;
 +
 +    /** Log reference. */
 +    private static final AtomicReference<IgniteLogger> logRef = new AtomicReference<>();
 +
++    /** Logger. */
++    private static IgniteLogger log;
++
 +    /** Cache name ({@code null} for default cache). */
 +    private final String cacheName;
 +
 +    /** Portable enabled flag. */
 +    private final boolean portableEnabled;
 +
 +    /**
 +     *  If {@code true} then data will be transferred in compact format (only keys and values).
 +     *  Otherwise full map entry will be transferred (this is requires by DR internal logic).
 +     */
 +    private final boolean compact;
 +
 +    /** Per-node buffer size. */
 +    @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
 +    private int bufSize = DFLT_PER_NODE_BUFFER_SIZE;
 +
 +    /** */
 +    private int parallelOps = DFLT_MAX_PARALLEL_OPS;
 +
 +    /** */
 +    private long autoFlushFreq;
 +
 +    /** Mapping. */
 +    @GridToStringInclude
 +    private ConcurrentMap<UUID, Buffer> bufMappings = new ConcurrentHashMap8<>();
 +
-     /** Logger. */
-     private final IgniteLogger log;
- 
 +    /** Discovery listener. */
 +    private final GridLocalEventListener discoLsnr;
 +
 +    /** Context. */
 +    private final GridKernalContext ctx;
 +
 +    /** Communication topic for responses. */
 +    private final Object topic;
 +
 +    /** */
 +    private byte[] topicBytes;
 +
-     /** {@code True} if data streamer has been cancelled. */
++    /** {@code True} if data loader has been cancelled. */
 +    private volatile boolean cancelled;
 +
-     /** Active futures of this data streamer. */
++    /** Active futures of this data loader. */
 +    @GridToStringInclude
 +    private final Collection<IgniteInternalFuture<?>> activeFuts = new GridConcurrentHashSet<>();
 +
 +    /** Closure to remove from active futures. */
 +    @GridToStringExclude
 +    private final IgniteInClosure<IgniteInternalFuture<?>> rmvActiveFut = new IgniteInClosure<IgniteInternalFuture<?>>() {
 +        @Override public void apply(IgniteInternalFuture<?> t) {
 +            boolean rmv = activeFuts.remove(t);
 +
 +            assert rmv;
 +        }
 +    };
 +
 +    /** Job peer deploy aware. */
 +    private volatile GridPeerDeployAware jobPda;
 +
 +    /** Deployment class. */
 +    private Class<?> depCls;
 +
 +    /** Future to track loading finish. */
 +    private final GridFutureAdapter<?> fut;
 +
 +    /** Public API future to track loading finish. */
 +    private final IgniteFuture<?> publicFut;
 +
 +    /** Busy lock. */
 +    private final GridSpinBusyLock busyLock = new GridSpinBusyLock();
 +
 +    /** Closed flag. */
 +    private final AtomicBoolean closed = new AtomicBoolean();
 +
 +    /** */
 +    private volatile long lastFlushTime = U.currentTimeMillis();
 +
 +    /** */
 +    private final DelayQueue<IgniteDataStreamerImpl<K, V>> flushQ;
 +
 +    /** */
 +    private boolean skipStore;
 +
 +    /** */
 +    private int maxRemapCnt = DFLT_MAX_REMAP_CNT;
 +
 +    /** Whether a warning at {@link IgniteDataStreamerImpl#allowOverwrite()} printed */
 +    private static boolean isWarningPrinted;
 +
 +    /**
 +     * @param ctx Grid kernal context.
 +     * @param cacheName Cache name.
 +     * @param flushQ Flush queue.
 +     * @param compact If {@code true} data is transferred in compact mode (only keys and values).
 +     *                Otherwise full map entry will be transferred (this is required by DR internal logic).
 +     */
 +    public IgniteDataStreamerImpl(
 +        final GridKernalContext ctx,
 +        @Nullable final String cacheName,
 +        DelayQueue<IgniteDataStreamerImpl<K, V>> flushQ,
 +        boolean compact
 +    ) {
 +        assert ctx != null;
 +
 +        this.ctx = ctx;
 +        this.cacheName = cacheName;
 +        this.flushQ = flushQ;
 +        this.compact = compact;
 +
-         log = U.logger(ctx, logRef, IgniteDataStreamerImpl.class);
++        if (log == null)
++            log = U.logger(ctx, logRef, IgniteDataStreamerImpl.class);
 +
 +        ClusterNode node = F.first(ctx.grid().cluster().forCacheNodes(cacheName).nodes());
 +
 +        if (node == null)
 +            throw new IllegalStateException("Cache doesn't exist: " + cacheName);
 +
 +        portableEnabled = ctx.portable().portableEnabled(node, cacheName);
 +
 +        discoLsnr = new GridLocalEventListener() {
 +            @Override public void onEvent(Event evt) {
 +                assert evt.type() == EVT_NODE_FAILED || evt.type() == EVT_NODE_LEFT;
 +
 +                DiscoveryEvent discoEvt = (DiscoveryEvent)evt;
 +
 +                UUID id = discoEvt.eventNode().id();
 +
 +                // Remap regular mappings.
 +                final Buffer buf = bufMappings.remove(id);
 +
 +                if (buf != null) {
 +                    // Only async notification is possible since
 +                    // discovery thread may be trapped otherwise.
 +                    ctx.closure().callLocalSafe(
 +                        new Callable<Object>() {
 +                            @Override public Object call() throws Exception {
 +                                buf.onNodeLeft();
 +
 +                                return null;
 +                            }
 +                        },
 +                        true /* system pool */
 +                    );
 +                }
 +            }
 +        };
 +
 +        ctx.event().addLocalEventListener(discoLsnr, EVT_NODE_FAILED, EVT_NODE_LEFT);
 +
 +        // Generate unique topic for this loader.
 +        topic = TOPIC_DATALOAD.topic(IgniteUuid.fromUuid(ctx.localNodeId()));
 +
 +        ctx.io().addMessageListener(topic, new GridMessageListener() {
 +            @Override public void onMessage(UUID nodeId, Object msg) {
 +                assert msg instanceof GridDataLoadResponse;
 +
 +                GridDataLoadResponse res = (GridDataLoadResponse)msg;
 +
 +                if (log.isDebugEnabled())
 +                    log.debug("Received data load response: " + res);
 +
 +                Buffer buf = bufMappings.get(nodeId);
 +
 +                if (buf != null)
 +                    buf.onResponse(res);
 +
 +                else if (log.isDebugEnabled())
 +                    log.debug("Ignoring response since node has left [nodeId=" + nodeId + ", ");
 +            }
 +        });
 +
 +        if (log.isDebugEnabled())
 +            log.debug("Added response listener within topic: " + topic);
 +
 +        fut = new IgniteDataStreamerFuture(ctx, this);
 +
 +        publicFut = new IgniteFutureImpl<>(fut);
 +    }
 +
 +    /**
 +     * Enters busy lock.
 +     */
 +    private void enterBusy() {
 +        if (!busyLock.enterBusy())
 +            throw new IllegalStateException("Data streamer has been closed.");
 +    }
 +
 +    /**
 +     * Leaves busy lock.
 +     */
 +    private void leaveBusy() {
 +        busyLock.leaveBusy();
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public IgniteFuture<?> future() {
 +        return publicFut;
 +    }
 +
 +    /**
 +     * @return Internal future.
 +     */
 +    public IgniteInternalFuture<?> internalFuture() {
 +        return fut;
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public void deployClass(Class<?> depCls) {
 +        this.depCls = depCls;
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public void updater(Updater<K, V> updater) {
 +        A.notNull(updater, "updater");
 +
 +        this.updater = updater;
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public boolean allowOverwrite() {
 +        boolean allow = updater != ISOLATED_UPDATER;
 +        
 +        if (!allow && !isWarningPrinted) {
 +            synchronized (this) {
 +                if (!isWarningPrinted) {
 +                    log.warning("Data streamer will not overwrite existing cache entries for better performance " +
 +                        "(to change, set allowOverwrite to true)");
 +
 +                    isWarningPrinted = true;
 +                }
 +            }
 +        }
 +        
 +        return allow;
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public void allowOverwrite(boolean allow) {
 +        if (allow == allowOverwrite())
 +            return;
 +
 +        ClusterNode node = F.first(ctx.grid().cluster().forCacheNodes(cacheName).nodes());
 +
 +        if (node == null)
 +            throw new IgniteException("Failed to get node for cache: " + cacheName);
 +
 +        updater = allow ? IgniteDataStreamerCacheUpdaters.<K, V>individual() : ISOLATED_UPDATER;
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public boolean skipStore() {
 +        return skipStore;
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public void skipStore(boolean skipStore) {
 +        this.skipStore = skipStore;
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override @Nullable public String cacheName() {
 +        return cacheName;
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public int perNodeBufferSize() {
 +        return bufSize;
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public void perNodeBufferSize(int bufSize) {
 +        A.ensure(bufSize > 0, "bufSize > 0");
 +
 +        this.bufSize = bufSize;
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public int perNodeParallelStreamOperations() {
 +        return parallelOps;
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public void perNodeParallelStreamOperations(int parallelOps) {
 +        this.parallelOps = parallelOps;
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public long autoFlushFrequency() {
 +        return autoFlushFreq;
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public void autoFlushFrequency(long autoFlushFreq) {
 +        A.ensure(autoFlushFreq >= 0, "autoFlushFreq >= 0");
 +
 +        long old = this.autoFlushFreq;
 +
 +        if (autoFlushFreq != old) {
 +            this.autoFlushFreq = autoFlushFreq;
 +
 +            if (autoFlushFreq != 0 && old == 0)
 +                flushQ.add(this);
 +            else if (autoFlushFreq == 0)
 +                flushQ.remove(this);
 +        }
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public IgniteFuture<?> addData(Map<K, V> entries) throws IllegalStateException {
 +        A.notNull(entries, "entries");
 +
 +        return addData(entries.entrySet());
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public IgniteFuture<?> addData(Collection<? extends Map.Entry<K, V>> entries) {
 +        A.notEmpty(entries, "entries");
 +
 +        enterBusy();
 +
 +        try {
-             GridFutureAdapter<Object> resFut = new GridFutureAdapter<>(ctx);
++            GridFutureAdapter<Object> resFut = new GridFutureAdapter<>();
 +
-             resFut.listenAsync(rmvActiveFut);
++            resFut.listen(rmvActiveFut);
 +
 +            activeFuts.add(resFut);
 +
 +            Collection<K> keys = null;
 +
 +            if (entries.size() > 1) {
 +                keys = new GridConcurrentHashSet<>(entries.size(), U.capacity(entries.size()), 1);
 +
 +                for (Map.Entry<K, V> entry : entries)
 +                    keys.add(entry.getKey());
 +            }
 +
 +            load0(entries, resFut, keys, 0);
 +
 +            return new IgniteFutureImpl<>(resFut);
 +        }
 +        catch (IgniteException e) {
-             return new IgniteFinishedFutureImpl<>(ctx, e);
++            return new IgniteFinishedFutureImpl<>(e);
 +        }
 +        finally {
 +            leaveBusy();
 +        }
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public IgniteFuture<?> addData(Map.Entry<K, V> entry) {
 +        A.notNull(entry, "entry");
 +
 +        return addData(F.asList(entry));
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public IgniteFuture<?> addData(K key, V val) {
 +        A.notNull(key, "key");
 +
 +        return addData(new Entry0<>(key, val));
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public IgniteFuture<?> removeData(K key) {
 +        return addData(key, null);
 +    }
 +
 +    /**
 +     * @param entries Entries.
 +     * @param resFut Result future.
 +     * @param activeKeys Active keys.
 +     * @param remaps Remaps count.
 +     */
 +    private void load0(
 +        Collection<? extends Map.Entry<K, V>> entries,
 +        final GridFutureAdapter<Object> resFut,
 +        @Nullable final Collection<K> activeKeys,
 +        final int remaps
 +    ) {
 +        assert entries != null;
 +
 +        Map<ClusterNode, Collection<Map.Entry<K, V>>> mappings = new HashMap<>();
 +
 +        boolean initPda = ctx.deploy().enabled() && jobPda == null;
 +
 +        for (Map.Entry<K, V> entry : entries) {
 +            List<ClusterNode> nodes;
 +
 +            try {
 +                K key = entry.getKey();
 +
 +                assert key != null;
 +
 +                if (initPda) {
 +                    jobPda = new DataStreamerPda(key, entry.getValue(), updater);
 +
 +                    initPda = false;
 +                }
 +
 +                nodes = nodes(key);
 +            }
 +            catch (IgniteCheckedException e) {
 +                resFut.onDone(e);
 +
 +                return;
 +            }
 +
 +            if (F.isEmpty(nodes)) {
 +                resFut.onDone(new ClusterTopologyException("Failed to map key to node " +
 +                    "(no nodes with cache found in topology) [infos=" + entries.size() +
 +                    ", cacheName=" + cacheName + ']'));
 +
 +                return;
 +            }
 +
 +            for (ClusterNode node : nodes) {
 +                Collection<Map.Entry<K, V>> col = mappings.get(node);
 +
 +                if (col == null)
 +                    mappings.put(node, col = new ArrayList<>());
 +
 +                col.add(entry);
 +            }
 +        }
 +
 +        for (final Map.Entry<ClusterNode, Collection<Map.Entry<K, V>>> e : mappings.entrySet()) {
 +            final UUID nodeId = e.getKey().id();
 +
 +            Buffer buf = bufMappings.get(nodeId);
 +
 +            if (buf == null) {
 +                Buffer old = bufMappings.putIfAbsent(nodeId, buf = new Buffer(e.getKey()));
 +
 +                if (old != null)
 +                    buf = old;
 +            }
 +
 +            final Collection<Map.Entry<K, V>> entriesForNode = e.getValue();
 +
 +            IgniteInClosure<IgniteInternalFuture<?>> lsnr = new IgniteInClosure<IgniteInternalFuture<?>>() {
 +                @Override public void apply(IgniteInternalFuture<?> t) {
 +                    try {
 +                        t.get();
 +
 +                        if (activeKeys != null) {
 +                            for (Map.Entry<K, V> e : entriesForNode)
 +                                activeKeys.remove(e.getKey());
 +
 +                            if (activeKeys.isEmpty())
 +                                resFut.onDone();
 +                        }
 +                        else {
 +                            assert entriesForNode.size() == 1;
 +
 +                            // That has been a single key,
 +                            // so complete result future right away.
 +                            resFut.onDone();
 +                        }
 +                    }
 +                    catch (IgniteCheckedException e1) {
 +                        if (log.isDebugEnabled())
 +                            log.debug("Future finished with error [nodeId=" + nodeId + ", err=" + e1 + ']');
 +
 +                        if (cancelled) {
 +                            resFut.onDone(new IgniteCheckedException("Data streamer has been cancelled: " +
 +                                IgniteDataStreamerImpl.this, e1));
 +                        }
 +                        else if (remaps + 1 > maxRemapCnt) {
 +                            resFut.onDone(new IgniteCheckedException("Failed to finish operation (too many remaps): "
 +                                + remaps), e1);
 +                        }
 +                        else
 +                            load0(entriesForNode, resFut, activeKeys, remaps + 1);
 +                    }
 +                }
 +            };
 +
 +            GridFutureAdapter<?> f;
 +
 +            try {
 +                f = buf.update(entriesForNode, lsnr);
 +            }
 +            catch (IgniteInterruptedCheckedException e1) {
 +                resFut.onDone(e1);
 +
 +                return;
 +            }
 +
 +            if (ctx.discovery().node(nodeId) == null) {
 +                if (bufMappings.remove(nodeId, buf))
 +                    buf.onNodeLeft();
 +
 +                if (f != null)
 +                    f.onDone(new ClusterTopologyCheckedException("Failed to wait for request completion " +
 +                        "(node has left): " + nodeId));
 +            }
 +        }
 +    }
 +
 +    /**
 +     * @param key Key to map.
 +     * @return Nodes to send requests to.
 +     * @throws IgniteCheckedException If failed.
 +     */
 +    private List<ClusterNode> nodes(K key) throws IgniteCheckedException {
 +        GridAffinityProcessor aff = ctx.affinity();
 +
 +        return !allowOverwrite() ? aff.mapKeyToPrimaryAndBackups(cacheName, key) :
 +            Collections.singletonList(aff.mapKeyToNode(cacheName, key));
 +    }
 +
 +    /**
 +     * Performs flush.
 +     *
 +     * @throws IgniteCheckedException If failed.
 +     */
 +    private void doFlush() throws IgniteCheckedException {
 +        lastFlushTime = U.currentTimeMillis();
 +
 +        List<IgniteInternalFuture> activeFuts0 = null;
 +
 +        int doneCnt = 0;
 +
 +        for (IgniteInternalFuture<?> f : activeFuts) {
 +            if (!f.isDone()) {
 +                if (activeFuts0 == null)
 +                    activeFuts0 = new ArrayList<>((int)(activeFuts.size() * 1.2));
 +
 +                activeFuts0.add(f);
 +            }
 +            else {
 +                f.get();
 +
 +                doneCnt++;
 +            }
 +        }
 +
 +        if (activeFuts0 == null || activeFuts0.isEmpty())
 +            return;
 +
 +        while (true) {
 +            Queue<IgniteInternalFuture<?>> q = null;
 +
 +            for (Buffer buf : bufMappings.values()) {
 +                IgniteInternalFuture<?> flushFut = buf.flush();
 +
 +                if (flushFut != null) {
 +                    if (q == null)
 +                        q = new ArrayDeque<>(bufMappings.size() * 2);
 +
 +                    q.add(flushFut);
 +                }
 +            }
 +
 +            if (q != null) {
 +                assert !q.isEmpty();
 +
 +                boolean err = false;
 +
 +                for (IgniteInternalFuture fut = q.poll(); fut != null; fut = q.poll()) {
 +                    try {
 +                        fut.get();
 +                    }
 +                    catch (IgniteCheckedException e) {
 +                        if (log.isDebugEnabled())
 +                            log.debug("Failed to flush buffer: " + e);
 +
 +                        err = true;
 +                    }
 +                }
 +
 +                if (err)
 +                    // Remaps needed - flush buffers.
 +                    continue;
 +            }
 +
 +            doneCnt = 0;
 +
 +            for (int i = 0; i < activeFuts0.size(); i++) {
 +                IgniteInternalFuture f = activeFuts0.get(i);
 +
 +                if (f == null)
 +                    doneCnt++;
 +                else if (f.isDone()) {
 +                    f.get();
 +
 +                    doneCnt++;
 +
 +                    activeFuts0.set(i, null);
 +                }
 +                else
 +                    break;
 +            }
 +
 +            if (doneCnt == activeFuts0.size())
 +                return;
 +        }
 +    }
 +
 +    /** {@inheritDoc} */
 +    @SuppressWarnings("ForLoopReplaceableByForEach")
 +    @Override public void flush() throws IgniteException {
 +        enterBusy();
 +
 +        try {
 +            doFlush();
 +        }
 +        catch (IgniteCheckedException e) {
 +            throw U.convertException(e);
 +        }
 +        finally {
 +            leaveBusy();
 +        }
 +    }
 +
 +    /**
 +     * Flushes every internal buffer if buffer was flushed before passed in
 +     * threshold.
 +     * <p>
 +     * Does not wait for result and does not fail on errors assuming that this method
 +     * should be called periodically.
 +     */
 +    @Override public void tryFlush() throws IgniteInterruptedException {
 +        if (!busyLock.enterBusy())
 +            return;
 +
 +        try {
 +            for (Buffer buf : bufMappings.values())
 +                buf.flush();
 +
 +            lastFlushTime = U.currentTimeMillis();
 +        }
 +        catch (IgniteInterruptedCheckedException e) {
 +            throw U.convertException(e);
 +        }
 +        finally {
 +            leaveBusy();
 +        }
 +    }
 +
 +    /**
 +     * @param cancel {@code True} to close with cancellation.
 +     * @throws IgniteException If failed.
 +     */
 +    @Override public void close(boolean cancel) throws IgniteException {
 +        try {
 +            closeEx(cancel);
 +        }
 +        catch (IgniteCheckedException e) {
 +            throw U.convertException(e);
 +        }
 +    }
 +
 +    /**
 +     * @param cancel {@code True} to close with cancellation.
 +     * @throws IgniteCheckedException If failed.
 +     */
 +    public void closeEx(boolean cancel) throws IgniteCheckedException {
 +        if (!closed.compareAndSet(false, true))
 +            return;
 +
 +        busyLock.block();
 +
 +        if (log.isDebugEnabled())
 +            log.debug("Closing data streamer [ldr=" + this + ", cancel=" + cancel + ']');
 +
 +        IgniteCheckedException e = null;
 +
 +        try {
 +            // Assuming that no methods are called on this loader after this method is called.
 +            if (cancel) {
 +                cancelled = true;
 +
 +                for (Buffer buf : bufMappings.values())
 +                    buf.cancelAll();
 +            }
 +            else
 +                doFlush();
 +
 +            ctx.event().removeLocalEventListener(discoLsnr);
 +
 +            ctx.io().removeMessageListener(topic);
 +        }
 +        catch (IgniteCheckedException e0) {
 +            e = e0;
 +        }
 +
 +        fut.onDone(null, e);
 +
 +        if (e != null)
 +            throw e;
 +    }
 +
 +    /**
 +     * @return {@code true} If the loader is closed.
 +     */
 +    boolean isClosed() {
 +        return fut.isDone();
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public void close() throws IgniteException {
 +        close(false);
 +    }
 +
 +    /**
 +     * @return Max remap count.
 +     */
 +    public int maxRemapCount() {
 +        return maxRemapCnt;
 +    }
 +
 +    /**
 +     * @param maxRemapCnt New max remap count.
 +     */
 +    public void maxRemapCount(int maxRemapCnt) {
 +        this.maxRemapCnt = maxRemapCnt;
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public String toString() {
 +        return S.toString(IgniteDataStreamerImpl.class, this);
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public long getDelay(TimeUnit unit) {
 +        return unit.convert(nextFlushTime() - U.currentTimeMillis(), TimeUnit.MILLISECONDS);
 +    }
 +
 +    /**
 +     * @return Next flush time.
 +     */
 +    private long nextFlushTime() {
 +        return lastFlushTime + autoFlushFreq;
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public int compareTo(Delayed o) {
 +        return nextFlushTime() > ((IgniteDataStreamerImpl)o).nextFlushTime() ? 1 : -1;
 +    }
 +
 +    /**
 +     *
 +     */
 +    private class Buffer {
 +        /** Node. */
 +        private final ClusterNode node;
 +
 +        /** Active futures. */
 +        private final Collection<IgniteInternalFuture<Object>> locFuts;
 +
 +        /** Buffered entries. */
 +        private List<Map.Entry<K, V>> entries;
 +
 +        /** */
 +        @GridToStringExclude
 +        private GridFutureAdapter<Object> curFut;
 +
 +        /** Local node flag. */
 +        private final boolean isLocNode;
 +
 +        /** ID generator. */
 +        private final AtomicLong idGen = new AtomicLong();
 +
 +        /** Active futures. */
 +        private final ConcurrentMap<Long, GridFutureAdapter<Object>> reqs;
 +
 +        /** */
 +        private final Semaphore sem;
 +
 +        /** Closure to signal on task finish. */
 +        @GridToStringExclude
 +        private final IgniteInClosure<IgniteInternalFuture<Object>> signalC = new IgniteInClosure<IgniteInternalFuture<Object>>() {
 +            @Override public void apply(IgniteInternalFuture<Object> t) {
 +                signalTaskFinished(t);
 +            }
 +        };
 +
 +        /**
 +         * @param node Node.
 +         */
 +        Buffer(ClusterNode node) {
 +            assert node != null;
 +
 +            this.node = node;
 +
 +            locFuts = new GridConcurrentHashSet<>();
 +            reqs = new ConcurrentHashMap8<>();
 +
 +            // Cache local node flag.
 +            isLocNode = node.equals(ctx.discovery().localNode());
 +
 +            entries = newEntries();
-             curFut = new GridFutureAdapter<>(ctx);
-             curFut.listenAsync(signalC);
++            curFut = new GridFutureAdapter<>();
++            curFut.listen(signalC);
 +
 +            sem = new Semaphore(parallelOps);
 +        }
 +
 +        /**
 +         * @param newEntries Infos.
 +         * @param lsnr Listener for the operation future.
 +         * @throws IgniteInterruptedCheckedException If failed.
 +         * @return Future for operation.
 +         */
 +        @Nullable GridFutureAdapter<?> update(Iterable<Map.Entry<K, V>> newEntries,
 +            IgniteInClosure<IgniteInternalFuture<?>> lsnr) throws IgniteInterruptedCheckedException {
 +            List<Map.Entry<K, V>> entries0 = null;
 +            GridFutureAdapter<Object> curFut0;
 +
 +            synchronized (this) {
 +                curFut0 = curFut;
 +
-                 curFut0.listenAsync(lsnr);
++                curFut0.listen(lsnr);
 +
 +                for (Map.Entry<K, V> entry : newEntries)
 +                    entries.add(entry);
 +
 +                if (entries.size() >= bufSize) {
 +                    entries0 = entries;
 +
 +                    entries = newEntries();
-                     curFut = new GridFutureAdapter<>(ctx);
-                     curFut.listenAsync(signalC);
++                    curFut = new GridFutureAdapter<>();
++                    curFut.listen(signalC);
 +                }
 +            }
 +
 +            if (entries0 != null) {
 +                submit(entries0, curFut0);
 +
 +                if (cancelled)
 +                    curFut0.onDone(new IgniteCheckedException("Data streamer has been cancelled: " + IgniteDataStreamerImpl.this));
 +            }
 +
 +            return curFut0;
 +        }
 +
 +        /**
 +         * @return Fresh collection with some space for outgrowth.
 +         */
 +        private List<Map.Entry<K, V>> newEntries() {
 +            return new ArrayList<>((int)(bufSize * 1.2));
 +        }
 +
 +        /**
 +         * @return Future if any submitted.
 +         *
 +         * @throws IgniteInterruptedCheckedException If thread has been interrupted.
 +         */
 +        @Nullable IgniteInternalFuture<?> flush() throws IgniteInterruptedCheckedException {
 +            List<Map.Entry<K, V>> entries0 = null;
 +            GridFutureAdapter<Object> curFut0 = null;
 +
 +            synchronized (this) {
 +                if (!entries.isEmpty()) {
 +                    entries0 = entries;
 +                    curFut0 = curFut;
 +
 +                    entries = newEntries();
-                     curFut = new GridFutureAdapter<>(ctx);
-                     curFut.listenAsync(signalC);
++                    curFut = new GridFutureAdapter<>();
++                    curFut.listen(signalC);
 +                }
 +            }
 +
 +            if (entries0 != null)
 +                submit(entries0, curFut0);
 +
 +            // Create compound future for this flush.
 +            GridCompoundFuture<Object, Object> res = null;
 +
 +            for (IgniteInternalFuture<Object> f : locFuts) {
 +                if (res == null)
-                     res = new GridCompoundFuture<>(ctx);
++                    res = new GridCompoundFuture<>();
 +
 +                res.add(f);
 +            }
 +
 +            for (IgniteInternalFuture<Object> f : reqs.values()) {
 +                if (res == null)
-                     res = new GridCompoundFuture<>(ctx);
++                    res = new GridCompoundFuture<>();
 +
 +                res.add(f);
 +            }
 +
 +            if (res != null)
 +                res.markInitialized();
 +
 +            return res;
 +        }
 +
 +        /**
 +         * Increments active tasks count.
 +         *
 +         * @throws IgniteInterruptedCheckedException If thread has been interrupted.
 +         */
 +        private void incrementActiveTasks() throws IgniteInterruptedCheckedException {
 +            U.acquire(sem);
 +        }
 +
 +        /**
 +         * @param f Future that finished.
 +         */
 +        private void signalTaskFinished(IgniteInternalFuture<Object> f) {
 +            assert f != null;
 +
 +            sem.release();
 +        }
 +
 +        /**
 +         * @param entries Entries to submit.
 +         * @param curFut Current future.
 +         * @throws IgniteInterruptedCheckedException If interrupted.
 +         */
 +        private void submit(final Collection<Map.Entry<K, V>> entries, final GridFutureAdapter<Object> curFut)
 +            throws IgniteInterruptedCheckedException {
 +            assert entries != null;
 +            assert !entries.isEmpty();
 +            assert curFut != null;
 +
 +            incrementActiveTasks();
 +
 +            IgniteInternalFuture<Object> fut;
 +
 +            if (isLocNode) {
 +                fut = ctx.closure().callLocalSafe(
 +                    new IgniteDataStreamerUpdateJob<>(ctx, log, cacheName, entries, false, skipStore, updater), false);
 +
 +                locFuts.add(fut);
 +
-                 fut.listenAsync(new IgniteInClosure<IgniteInternalFuture<Object>>() {
++                fut.listen(new IgniteInClosure<IgniteInternalFuture<Object>>() {
 +                    @Override public void apply(IgniteInternalFuture<Object> t) {
 +                        try {
 +                            boolean rmv = locFuts.remove(t);
 +
 +                            assert rmv;
 +
 +                            curFut.onDone(t.get());
 +                        }
 +                        catch (IgniteCheckedException e) {
 +                            curFut.onDone(e);
 +                        }
 +                    }
 +                });
 +            }
 +            else {
 +                byte[] entriesBytes;
 +
 +                try {
 +                    if (compact) {
 +                        entriesBytes = ctx.config().getMarshaller()
 +                            .marshal(new Entries0<>(entries, portableEnabled ? ctx.portable() : null));
 +                    }
 +                    else
 +                        entriesBytes = ctx.config().getMarshaller().marshal(entries);
 +
 +                    if (updaterBytes == null) {
 +                        assert updater != null;
 +
 +                        updaterBytes = ctx.config().getMarshaller().marshal(updater);
 +                    }
 +
 +                    if (topicBytes == null)
 +                        topicBytes = ctx.config().getMarshaller().marshal(topic);
 +                }
 +                catch (IgniteCheckedException e) {
 +                    U.error(log, "Failed to marshal (request will not be sent).", e);
 +
 +                    return;
 +                }
 +
 +                GridDeployment dep = null;
 +                GridPeerDeployAware jobPda0 = null;
 +
 +                if (ctx.deploy().enabled()) {
 +                    try {
 +                        jobPda0 = jobPda;
 +
 +                        assert jobPda0 != null;
 +
 +                        dep = ctx.deploy().deploy(jobPda0.deployClass(), jobPda0.classLoader());
 +
 +                        GridCacheAdapter<Object, Object> cache = ctx.cache().internalCache(cacheName);
 +
 +                        if (cache != null)
 +                            cache.context().deploy().onEnter();
 +                    }
 +                    catch (IgniteCheckedException e) {
 +                        U.error(log, "Failed to deploy class (request will not be sent): " + jobPda0.deployClass(), e);
 +
 +                        return;
 +                    }
 +
 +                    if (dep == null)
 +                        U.warn(log, "Failed to deploy class (request will be sent): " + jobPda0.deployClass());
 +                }
 +
 +                long reqId = idGen.incrementAndGet();
 +
 +                fut = curFut;
 +
 +                reqs.put(reqId, (GridFutureAdapter<Object>)fut);
 +
 +                GridDataLoadRequest req = new GridDataLoadRequest(
 +                    reqId,
 +                    topicBytes,
 +                    cacheName,
 +                    updaterBytes,
 +                    entriesBytes,
 +                    true,
 +                    skipStore,
 +                    dep != null ? dep.deployMode() : null,
 +                    dep != null ? jobPda0.deployClass().getName() : null,
 +                    dep != null ? dep.userVersion() : null,
 +                    dep != null ? dep.participants() : null,
 +                    dep != null ? dep.classLoaderId() : null,
 +                    dep == null);
 +
 +                try {
 +                    ctx.io().send(node, TOPIC_DATALOAD, req, PUBLIC_POOL);
 +
 +                    if (log.isDebugEnabled())
 +                        log.debug("Sent request to node [nodeId=" + node.id() + ", req=" + req + ']');
 +                }
 +                catch (IgniteCheckedException e) {
 +                    if (ctx.discovery().alive(node) && ctx.discovery().pingNode(node.id()))
 +                        ((GridFutureAdapter<Object>)fut).onDone(e);
 +                    else
 +                        ((GridFutureAdapter<Object>)fut).onDone(new ClusterTopologyCheckedException("Failed to send " +
 +                            "request (node has left): " + node.id()));
 +                }
 +            }
 +        }
 +
 +        /**
 +         *
 +         */
 +        void onNodeLeft() {
 +            assert !isLocNode;
 +            assert bufMappings.get(node.id()) != this;
 +
 +            if (log.isDebugEnabled())
 +                log.debug("Forcibly completing futures (node has left): " + node.id());
 +
 +            Exception e = new ClusterTopologyCheckedException("Failed to wait for request completion " +
 +                "(node has left): " + node.id());
 +
 +            for (GridFutureAdapter<Object> f : reqs.values())
 +                f.onDone(e);
 +
 +            // Make sure to complete current future.
 +            GridFutureAdapter<Object> curFut0;
 +
 +            synchronized (this) {
 +                curFut0 = curFut;
 +            }
 +
 +            curFut0.onDone(e);
 +        }
 +
 +        /**
 +         * @param res Response.
 +         */
 +        void onResponse(GridDataLoadResponse res) {
 +            if (log.isDebugEnabled())
 +                log.debug("Received data load response: " + res);
 +
 +            GridFutureAdapter<?> f = reqs.remove(res.requestId());
 +
 +            if (f == null) {
 +                if (log.isDebugEnabled())
 +                    log.debug("Future for request has not been found: " + res.requestId());
 +
 +                return;
 +            }
 +
 +            Throwable err = null;
 +
 +            byte[] errBytes = res.errorBytes();
 +
 +            if (errBytes != null) {
 +                try {
 +                    GridPeerDeployAware jobPda0 = jobPda;
 +
 +                    err = ctx.config().getMarshaller().unmarshal(
 +                        errBytes,
 +                        jobPda0 != null ? jobPda0.classLoader() : U.gridClassLoader());
 +                }
 +                catch (IgniteCheckedException e) {
 +                    f.onDone(null, new IgniteCheckedException("Failed to unmarshal response.", e));
 +
 +                    return;
 +                }
 +            }
 +
 +            f.onDone(null, err);
 +
 +            if (log.isDebugEnabled())
 +                log.debug("Finished future [fut=" + f + ", reqId=" + res.requestId() + ", err=" + err + ']');
 +        }
 +
 +        /**
 +         *
 +         */
 +        void cancelAll() {
 +            IgniteCheckedException err = new IgniteCheckedException("Data streamer has been cancelled: " + IgniteDataStreamerImpl.this);
 +
 +            for (IgniteInternalFuture<?> f : locFuts) {
 +                try {
 +                    f.cancel();
 +                }
 +                catch (IgniteCheckedException e) {
 +                    U.error(log, "Failed to cancel mini-future.", e);
 +                }
 +            }
 +
 +            for (GridFutureAdapter<?> f : reqs.values())
 +                f.onDone(err);
 +        }
 +
 +        /** {@inheritDoc} */
 +        @Override public String toString() {
 +            int size;
 +
 +            synchronized (this) {
 +                size = entries.size();
 +            }
 +
 +            return S.toString(Buffer.class, this,
 +                "entriesCnt", size,
 +                "locFutsSize", locFuts.size(),
 +                "reqsSize", reqs.size());
 +        }
 +    }
 +
 +    /**
 +     * Data streamer peer-deploy aware.
 +     */
 +    private class DataStreamerPda implements GridPeerDeployAware {
 +        /** */
 +        private static final long serialVersionUID = 0L;
 +
 +        /** Deploy class. */
 +        private Class<?> cls;
 +
 +        /** Class loader. */
 +        private ClassLoader ldr;
 +
 +        /** Collection of objects to detect deploy class and class loader. */
 +        private Collection<Object> objs;
 +
 +        /**
 +         * Constructs data streamer peer-deploy aware.
 +         *
 +         * @param objs Collection of objects to detect deploy class and class loader.
 +         */
 +        private DataStreamerPda(Object... objs) {
 +            this.objs = Arrays.asList(objs);
 +        }
 +
 +        /** {@inheritDoc} */
 +        @Override public Class<?> deployClass() {
 +            if (cls == null) {
 +                Class<?> cls0 = null;
 +
 +                if (depCls != null)
 +                    cls0 = depCls;
 +                else {
 +                    for (Iterator<Object> it = objs.iterator(); (cls0 == null || U.isJdk(cls0)) && it.hasNext();) {
 +                        Object o = it.next();
 +
 +                        if (o != null)
 +                            cls0 = U.detectClass(o);
 +                    }
 +
 +                    if (cls0 == null || U.isJdk(cls0))
 +                        cls0 = IgniteDataStreamerImpl.class;
 +                }
 +
 +                assert cls0 != null : "Failed to detect deploy class [objs=" + objs + ']';
 +
 +                cls = cls0;
 +            }
 +
 +            return cls;
 +        }
 +
 +        /** {@inheritDoc} */
 +        @Override public ClassLoader classLoader() {
 +            if (ldr == null) {
 +                ClassLoader ldr0 = deployClass().getClassLoader();
 +
 +                // Safety.
 +                if (ldr0 == null)
 +                    ldr0 = U.gridClassLoader();
 +
 +                assert ldr0 != null : "Failed to detect classloader [objs=" + objs + ']';
 +
 +                ldr = ldr0;
 +            }
 +
 +            return ldr;
 +        }
 +    }
 +
 +    /**
 +     * Entry.
 +     */
 +    private static class Entry0<K, V> implements Map.Entry<K, V>, Externalizable {
 +        /** */
 +        private static final long serialVersionUID = 0L;
 +
 +        /** */
 +        private K key;
 +
 +        /** */
 +        private V val;
 +
 +        /**
 +         * @param key Key.
 +         * @param val Value.
 +         */
 +        private Entry0(K key, @Nullable V val) {
 +            assert key != null;
 +
 +            this.key = key;
 +            this.val = val;
 +        }
 +
 +        /**
 +         * For {@link Externalizable}.
 +         */
 +        @SuppressWarnings("UnusedDeclaration")
 +        public Entry0() {
 +            // No-op.
 +        }
 +
 +        /** {@inheritDoc} */
 +        @Override public K getKey() {
 +            return key;
 +        }
 +
 +        /** {@inheritDoc} */
 +        @Override public V getValue() {
 +            return val;
 +        }
 +
 +        /** {@inheritDoc} */
 +        @Override public V setValue(V val) {
 +            V old = this.val;
 +
 +            this.val = val;
 +
 +            return old;
 +        }
 +
 +        /** {@inheritDoc} */
 +        @Override public void writeExternal(ObjectOutput out) throws IOException {
 +            out.writeObject(key);
 +            out.writeObject(val);
 +        }
 +
 +        /** {@inheritDoc} */
 +        @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
 +            key = (K)in.readObject();
 +            val = (V)in.readObject();
 +        }
 +    }
 +
 +    /**
 +     * Wrapper list with special compact serialization of map entries.
 +     */
 +    private static class Entries0<K, V> extends AbstractCollection<Map.Entry<K, V>> implements Externalizable {
 +        /** */
 +        private static final long serialVersionUID = 0L;
 +
 +        /**  Wrapped delegate. */
 +        private Collection<Map.Entry<K, V>> delegate;
 +
 +        /** Optional portable processor for converting values. */
 +        private GridPortableProcessor portable;
 +
 +        /**
 +         * @param delegate Delegate.
 +         * @param portable Portable processor.
 +         */
 +        private Entries0(Collection<Map.Entry<K, V>> delegate, GridPortableProcessor portable) {
 +            this.delegate = delegate;
 +            this.portable = portable;
 +        }
 +
 +        /**
 +         * For {@link Externalizable}.
 +         */
 +        public Entries0() {
 +            // No-op.
 +        }
 +
 +        /** {@inheritDoc} */
 +        @Override public Iterator<Entry<K, V>> iterator() {
 +            return delegate.iterator();
 +        }
 +
 +        /** {@inheritDoc} */
 +        @Override public int size() {
 +            return delegate.size();
 +        }
 +
 +        /** {@inheritDoc} */
 +        @Override public void writeExternal(ObjectOutput out) throws IOException {
 +            out.writeInt(delegate.size());
 +
 +            boolean portableEnabled = portable != null;
 +
 +            for (Map.Entry<K, V> entry : delegate) {
 +                if (portableEnabled) {
 +                    out.writeObject(portable.marshalToPortable(entry.getKey()));
 +                    out.writeObject(portable.marshalToPortable(entry.getValue()));
 +                }
 +                else {
 +                    out.writeObject(entry.getKey());
 +                    out.writeObject(entry.getValue());
 +                }
 +            }
 +        }
 +
 +        /** {@inheritDoc} */
 +        @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
 +            int sz = in.readInt();
 +
 +            delegate = new ArrayList<>(sz);
 +
 +            for (int i = 0; i < sz; i++) {
 +                Object k = in.readObject();
 +                Object v = in.readObject();
 +
 +                delegate.add(new Entry0<>((K)k, (V)v));
 +            }
 +        }
 +    }
 +
 +    /**
 +     * Isolated updater which only loads entry initial value.
 +     */
 +    private static class IsolatedUpdater<K, V> implements Updater<K, V> {
 +        /** */
 +        private static final long serialVersionUID = 0L;
 +
 +        /** {@inheritDoc} */
 +        @Override public void update(IgniteCache<K, V> cache, Collection<Map.Entry<K, V>> entries) {
 +            IgniteCacheProxy<K, V> proxy = (IgniteCacheProxy<K, V>)cache;
 +
 +            GridCacheAdapter<K, V> internalCache = proxy.context().cache();
 +
 +            if (internalCache.isNear())
 +                internalCache = internalCache.context().near().dht();
 +
 +            GridCacheContext<K, V> cctx = internalCache.context();
 +
 +            long topVer = cctx.affinity().affinityTopologyVersion();
 +
 +            GridCacheVersion ver = cctx.versions().next(topVer);
 +
 +            boolean portable = cctx.portableEnabled();
 +
 +            for (Map.Entry<K, V> e : entries) {
 +                try {
 +                    K key = e.getKey();
 +                    V val = e.getValue();
 +
 +                    if (portable) {
 +                        key = (K)cctx.marshalToPortable(key);
 +                        val = (V)cctx.marshalToPortable(val);
 +                    }
 +
 +                    GridCacheEntryEx<K, V> entry = internalCache.entryEx(key, topVer);
 +
 +                    entry.unswap(true, false);
 +
 +                    entry.initialValue(val, null, ver, CU.TTL_ETERNAL, CU.EXPIRE_TIME_ETERNAL, false, topVer,
 +                        GridDrType.DR_LOAD);
 +
 +                    cctx.evicts().touch(entry, topVer);
 +                }
 +                catch (GridDhtInvalidPartitionException | GridCacheEntryRemovedException ignored) {
 +                    // No-op.
 +                }
 +                catch (IgniteCheckedException ex) {
 +                    IgniteLogger log = cache.unwrap(Ignite.class).log();
 +
 +                    U.error(log, "Failed to set initial value for cache entry: " + e, ex);
 +                }
 +            }
 +        }
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1de65eb0/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/IgniteDataStreamerProcessor.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/IgniteDataStreamerProcessor.java
index 9dd4a8e,0000000..2934153
mode 100644,000000..100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/IgniteDataStreamerProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/IgniteDataStreamerProcessor.java
@@@ -1,316 -1,0 +1,316 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *      http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.ignite.internal.processors.datastream;
 +
 +import org.apache.ignite.*;
 +import org.apache.ignite.internal.*;
 +import org.apache.ignite.internal.managers.communication.*;
 +import org.apache.ignite.internal.managers.deployment.*;
 +import org.apache.ignite.internal.processors.*;
 +import org.apache.ignite.internal.util.*;
 +import org.apache.ignite.internal.util.typedef.*;
 +import org.apache.ignite.internal.util.typedef.internal.*;
 +import org.apache.ignite.internal.util.worker.*;
 +import org.apache.ignite.marshaller.*;
 +import org.apache.ignite.thread.*;
 +import org.jetbrains.annotations.*;
 +
 +import java.util.*;
 +import java.util.concurrent.*;
 +
 +import static org.apache.ignite.internal.GridTopic.*;
 +import static org.apache.ignite.internal.managers.communication.GridIoPolicy.*;
 +
 +/**
 + *
 + */
 +public class IgniteDataStreamerProcessor<K, V> extends GridProcessorAdapter {
 +    /** Loaders map (access is not supposed to be highly concurrent). */
 +    private Collection<IgniteDataStreamerImpl> ldrs = new GridConcurrentHashSet<>();
 +
 +    /** Busy lock. */
 +    private final GridSpinBusyLock busyLock = new GridSpinBusyLock();
 +
 +    /** Flushing thread. */
 +    private Thread flusher;
 +
 +    /** */
 +    private final DelayQueue<IgniteDataStreamerImpl<K, V>> flushQ = new DelayQueue<>();
 +
 +    /** Marshaller. */
 +    private final Marshaller marsh;
 +
 +    /**
 +     * @param ctx Kernal context.
 +     */
 +    public IgniteDataStreamerProcessor(GridKernalContext ctx) {
 +        super(ctx);
 +
 +        ctx.io().addMessageListener(TOPIC_DATALOAD, new GridMessageListener() {
 +            @Override public void onMessage(UUID nodeId, Object msg) {
 +                assert msg instanceof GridDataLoadRequest;
 +
 +                processDataLoadRequest(nodeId, (GridDataLoadRequest)msg);
 +            }
 +        });
 +
 +        marsh = ctx.config().getMarshaller();
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public void start() throws IgniteCheckedException {
 +        if (ctx.config().isDaemon())
 +            return;
 +
 +        flusher = new IgniteThread(new GridWorker(ctx.gridName(), "grid-data-loader-flusher", log) {
 +            @Override protected void body() throws InterruptedException {
 +                while (!isCancelled()) {
 +                    IgniteDataStreamerImpl<K, V> ldr = flushQ.take();
 +
 +                    if (!busyLock.enterBusy())
 +                        return;
 +
 +                    try {
 +                        if (ldr.isClosed())
 +                            continue;
 +
 +                        ldr.tryFlush();
 +
 +                        flushQ.offer(ldr);
 +                    }
 +                    finally {
 +                        busyLock.leaveBusy();
 +                    }
 +                }
 +            }
 +        });
 +
 +        flusher.start();
 +
 +        if (log.isDebugEnabled())
 +            log.debug("Started data streamer processor.");
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public void onKernalStop(boolean cancel) {
 +        if (ctx.config().isDaemon())
 +            return;
 +
 +        ctx.io().removeMessageListener(TOPIC_DATALOAD);
 +
 +        busyLock.block();
 +
 +        U.interrupt(flusher);
 +        U.join(flusher, log);
 +
 +        for (IgniteDataStreamerImpl<?, ?> ldr : ldrs) {
 +            if (log.isDebugEnabled())
 +                log.debug("Closing active data streamer on grid stop [ldr=" + ldr + ", cancel=" + cancel + ']');
 +
 +            try {
 +                ldr.closeEx(cancel);
 +            }
 +            catch (IgniteInterruptedCheckedException e) {
 +                U.warn(log, "Interrupted while waiting for completion of the data streamer: " + ldr, e);
 +            }
 +            catch (IgniteCheckedException e) {
 +                U.error(log, "Failed to close data streamer: " + ldr, e);
 +            }
 +        }
 +
 +        if (log.isDebugEnabled())
 +            log.debug("Stopped data streamer processor.");
 +    }
 +
 +    /**
 +     * @param cacheName Cache name ({@code null} for default cache).
 +     * @param compact {@code true} if data streamer should transfer data in compact format.
 +     * @return Data streamer.
 +     */
 +    public IgniteDataStreamerImpl<K, V> dataStreamer(@Nullable String cacheName, boolean compact) {
 +        if (!busyLock.enterBusy())
 +            throw new IllegalStateException("Failed to create data streamer (grid is stopping).");
 +
 +        try {
 +            final IgniteDataStreamerImpl<K, V> ldr = new IgniteDataStreamerImpl<>(ctx, cacheName, flushQ, compact);
 +
 +            ldrs.add(ldr);
 +
-             ldr.internalFuture().listenAsync(new CI1<IgniteInternalFuture<?>>() {
++            ldr.internalFuture().listen(new CI1<IgniteInternalFuture<?>>() {
 +                @Override public void apply(IgniteInternalFuture<?> f) {
 +                    boolean b = ldrs.remove(ldr);
 +
 +                    assert b : "Loader has not been added to set: " + ldr;
 +
 +                    if (log.isDebugEnabled())
 +                        log.debug("Loader has been completed: " + ldr);
 +                }
 +            });
 +
 +            return ldr;
 +        }
 +        finally {
 +            busyLock.leaveBusy();
 +        }
 +    }
 +
 +    /**
 +     * @param cacheName Cache name ({@code null} for default cache).
 +     * @return Data streamer.
 +     */
 +    public IgniteDataStreamer<K, V> dataStreamer(@Nullable String cacheName) {
 +        return dataStreamer(cacheName, true);
 +    }
 +
 +    /**
 +     * @param nodeId Sender ID.
 +     * @param req Request.
 +     */
 +    private void processDataLoadRequest(UUID nodeId, GridDataLoadRequest req) {
 +        if (!busyLock.enterBusy()) {
 +            if (log.isDebugEnabled())
 +                log.debug("Ignoring data load request (node is stopping): " + req);
 +
 +            return;
 +        }
 +
 +        try {
 +            if (log.isDebugEnabled())
 +                log.debug("Processing data load request: " + req);
 +
 +            Object topic;
 +
 +            try {
 +                topic = marsh.unmarshal(req.responseTopicBytes(), null);
 +            }
 +            catch (IgniteCheckedException e) {
 +                U.error(log, "Failed to unmarshal topic from request: " + req, e);
 +
 +                return;
 +            }
 +
 +            ClassLoader clsLdr;
 +
 +            if (req.forceLocalDeployment())
 +                clsLdr = U.gridClassLoader();
 +            else {
 +                GridDeployment dep = ctx.deploy().getGlobalDeployment(
 +                    req.deploymentMode(),
 +                    req.sampleClassName(),
 +                    req.sampleClassName(),
 +                    req.userVersion(),
 +                    nodeId,
 +                    req.classLoaderId(),
 +                    req.participants(),
 +                    null);
 +
 +                if (dep == null) {
 +                    sendResponse(nodeId,
 +                        topic,
 +                        req.requestId(),
 +                        new IgniteCheckedException("Failed to get deployment for request [sndId=" + nodeId +
 +                            ", req=" + req + ']'),
 +                        false);
 +
 +                    return;
 +                }
 +
 +                clsLdr = dep.classLoader();
 +            }
 +
 +            Collection<Map.Entry<K, V>> col;
 +            IgniteDataStreamer.Updater<K, V> updater;
 +
 +            try {
 +                col = marsh.unmarshal(req.collectionBytes(), clsLdr);
 +                updater = marsh.unmarshal(req.updaterBytes(), clsLdr);
 +            }
 +            catch (IgniteCheckedException e) {
 +                U.error(log, "Failed to unmarshal message [nodeId=" + nodeId + ", req=" + req + ']', e);
 +
 +                sendResponse(nodeId, topic, req.requestId(), e, false);
 +
 +                return;
 +            }
 +
 +            IgniteDataStreamerUpdateJob<K, V> job = new IgniteDataStreamerUpdateJob<>(ctx,
 +                log,
 +                req.cacheName(),
 +                col,
 +                req.ignoreDeploymentOwnership(),
 +                req.skipStore(),
 +                updater);
 +
 +            Exception err = null;
 +
 +            try {
 +                job.call();
 +            }
 +            catch (Exception e) {
 +                U.error(log, "Failed to finish update job.", e);
 +
 +                err = e;
 +            }
 +
 +            sendResponse(nodeId, topic, req.requestId(), err, req.forceLocalDeployment());
 +        }
 +        finally {
 +            busyLock.leaveBusy();
 +        }
 +    }
 +
 +    /**
 +     * @param nodeId Node ID.
 +     * @param resTopic Response topic.
 +     * @param reqId Request ID.
 +     * @param err Error.
 +     * @param forceLocDep Force local deployment.
 +     */
 +    private void sendResponse(UUID nodeId, Object resTopic, long reqId, @Nullable Throwable err,
 +        boolean forceLocDep) {
 +        byte[] errBytes;
 +
 +        try {
 +            errBytes = err != null ? marsh.marshal(err) : null;
 +        }
 +        catch (IgniteCheckedException e) {
 +            U.error(log, "Failed to marshal message.", e);
 +
 +            return;
 +        }
 +
 +        GridDataLoadResponse res = new GridDataLoadResponse(reqId, errBytes, forceLocDep);
 +
 +        try {
 +            ctx.io().send(nodeId, resTopic, res, PUBLIC_POOL);
 +        }
 +        catch (IgniteCheckedException e) {
 +            if (ctx.discovery().alive(nodeId))
 +                U.error(log, "Failed to respond to node [nodeId=" + nodeId + ", res=" + res + ']', e);
 +            else if (log.isDebugEnabled())
 +                log.debug("Node has left the grid: " + nodeId);
 +        }
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public void printMemoryStats() {
 +        X.println(">>>");
 +        X.println(">>> Data streamer processor memory stats [grid=" + ctx.gridName() + ']');
 +        X.println(">>>   ldrsSize: " + ldrs.size());
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1de65eb0/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1de65eb0/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1de65eb0/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/IgniteStreamerImpl.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1de65eb0/modules/core/src/test/java/org/apache/ignite/loadtests/colocation/GridTestMain.java
----------------------------------------------------------------------


Mime
View raw message