Return-Path: X-Original-To: apmail-ignite-commits-archive@minotaur.apache.org Delivered-To: apmail-ignite-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 597D51037B for ; Tue, 3 Mar 2015 18:40:21 +0000 (UTC) Received: (qmail 45663 invoked by uid 500); 3 Mar 2015 18:40:21 -0000 Delivered-To: apmail-ignite-commits-archive@ignite.apache.org Received: (qmail 45632 invoked by uid 500); 3 Mar 2015 18:40:21 -0000 Mailing-List: contact commits-help@ignite.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.incubator.apache.org Delivered-To: mailing list commits@ignite.incubator.apache.org Received: (qmail 45623 invoked by uid 99); 3 Mar 2015 18:40:21 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 03 Mar 2015 18:40:21 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED,T_RP_MATCHES_RCVD X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO mail.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with SMTP; Tue, 03 Mar 2015 18:40:16 +0000 Received: (qmail 44114 invoked by uid 99); 3 Mar 2015 18:39:56 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 03 Mar 2015 18:39:56 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 9F7D3E03B9; Tue, 3 Mar 2015 18:39:56 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sboikov@apache.org To: commits@ignite.incubator.apache.org Date: Tue, 03 Mar 2015 18:39:56 -0000 Message-Id: <8d500badbd4d44f6a388da007a5b9ea2@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [1/3] incubator-ignite git commit: # ignite-394: IgniteDataLoader -> IgniteDataStreamer.java + impl X-Virus-Checked: Checked by ClamAV on apache.org Repository: incubator-ignite Updated Branches: refs/heads/ignite-394 [created] 9b33b6510 http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9b33b651/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerImpl.java new file mode 100644 index 0000000..3f94752 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerImpl.java @@ -0,0 +1,1453 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.dataload; + +import org.apache.ignite.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.events.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.cluster.*; +import org.apache.ignite.internal.managers.communication.*; +import org.apache.ignite.internal.managers.deployment.*; +import org.apache.ignite.internal.managers.eventstorage.*; +import org.apache.ignite.internal.processors.affinity.*; +import org.apache.ignite.internal.processors.cache.*; +import org.apache.ignite.internal.processors.cache.distributed.dht.*; +import org.apache.ignite.internal.processors.cache.version.*; +import org.apache.ignite.internal.processors.dr.*; +import org.apache.ignite.internal.processors.portable.*; +import org.apache.ignite.internal.util.*; +import org.apache.ignite.internal.util.future.*; +import org.apache.ignite.internal.util.lang.*; +import org.apache.ignite.internal.util.tostring.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.lang.*; +import org.jdk8.backport.*; +import org.jetbrains.annotations.*; + +import java.io.*; +import java.util.*; +import java.util.Map.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; + +import static org.apache.ignite.events.EventType.*; +import static org.apache.ignite.internal.GridTopic.*; +import static org.apache.ignite.internal.managers.communication.GridIoPolicy.*; + +/** + * Data loader implementation. + */ +@SuppressWarnings("unchecked") +public class IgniteDataStreamerImpl implements IgniteDataStreamer, Delayed { + /** Isolated updater. */ + private static final Updater ISOLATED_UPDATER = new IsolatedUpdater(); + + /** Cache updater. */ + private Updater updater = ISOLATED_UPDATER; + + /** */ + private byte[] updaterBytes; + + /** Max remap count before issuing an error. */ + private static final int DFLT_MAX_REMAP_CNT = 32; + + /** Log reference. */ + private static final AtomicReference logRef = new AtomicReference<>(); + + /** Cache name ({@code null} for default cache). */ + private final String cacheName; + + /** Portable enabled flag. */ + private final boolean portableEnabled; + + /** + * If {@code true} then data will be transferred in compact format (only keys and values). + * Otherwise full map entry will be transferred (this is requires by DR internal logic). + */ + private final boolean compact; + + /** Per-node buffer size. */ + @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized") + private int bufSize = DFLT_PER_NODE_BUFFER_SIZE; + + /** */ + private int parallelOps = DFLT_MAX_PARALLEL_OPS; + + /** */ + private long autoFlushFreq; + + /** Mapping. */ + @GridToStringInclude + private ConcurrentMap bufMappings = new ConcurrentHashMap8<>(); + + /** Logger. */ + private IgniteLogger log; + + /** Discovery listener. */ + private final GridLocalEventListener discoLsnr; + + /** Context. */ + private final GridKernalContext ctx; + + /** Communication topic for responses. */ + private final Object topic; + + /** */ + private byte[] topicBytes; + + /** {@code True} if data loader has been cancelled. */ + private volatile boolean cancelled; + + /** Active futures of this data loader. */ + @GridToStringInclude + private final Collection> activeFuts = new GridConcurrentHashSet<>(); + + /** Closure to remove from active futures. */ + @GridToStringExclude + private final IgniteInClosure> rmvActiveFut = new IgniteInClosure>() { + @Override public void apply(IgniteInternalFuture t) { + boolean rmv = activeFuts.remove(t); + + assert rmv; + } + }; + + /** Job peer deploy aware. */ + private volatile GridPeerDeployAware jobPda; + + /** Deployment class. */ + private Class depCls; + + /** Future to track loading finish. */ + private final GridFutureAdapter fut; + + /** Public API future to track loading finish. */ + private final IgniteFuture publicFut; + + /** Busy lock. */ + private final GridSpinBusyLock busyLock = new GridSpinBusyLock(); + + /** Closed flag. */ + private final AtomicBoolean closed = new AtomicBoolean(); + + /** */ + private volatile long lastFlushTime = U.currentTimeMillis(); + + /** */ + private final DelayQueue> flushQ; + + /** */ + private boolean skipStore; + + /** */ + private int maxRemapCnt = DFLT_MAX_REMAP_CNT; + + /** + * @param ctx Grid kernal context. + * @param cacheName Cache name. + * @param flushQ Flush queue. + * @param compact If {@code true} data is transferred in compact mode (only keys and values). + * Otherwise full map entry will be transferred (this is required by DR internal logic). + */ + public IgniteDataStreamerImpl( + final GridKernalContext ctx, + @Nullable final String cacheName, + DelayQueue> flushQ, + boolean compact + ) { + assert ctx != null; + + this.ctx = ctx; + this.cacheName = cacheName; + this.flushQ = flushQ; + this.compact = compact; + + log = U.logger(ctx, logRef, IgniteDataStreamerImpl.class); + + ClusterNode node = F.first(ctx.grid().cluster().forCacheNodes(cacheName).nodes()); + + if (node == null) + throw new IllegalStateException("Cache doesn't exist: " + cacheName); + + portableEnabled = ctx.portable().portableEnabled(node, cacheName); + + discoLsnr = new GridLocalEventListener() { + @Override public void onEvent(Event evt) { + assert evt.type() == EVT_NODE_FAILED || evt.type() == EVT_NODE_LEFT; + + DiscoveryEvent discoEvt = (DiscoveryEvent)evt; + + UUID id = discoEvt.eventNode().id(); + + // Remap regular mappings. + final Buffer buf = bufMappings.remove(id); + + if (buf != null) { + // Only async notification is possible since + // discovery thread may be trapped otherwise. + ctx.closure().callLocalSafe( + new Callable() { + @Override public Object call() throws Exception { + buf.onNodeLeft(); + + return null; + } + }, + true /* system pool */ + ); + } + } + }; + + ctx.event().addLocalEventListener(discoLsnr, EVT_NODE_FAILED, EVT_NODE_LEFT); + + // Generate unique topic for this loader. + topic = TOPIC_DATALOAD.topic(IgniteUuid.fromUuid(ctx.localNodeId())); + + ctx.io().addMessageListener(topic, new GridMessageListener() { + @Override public void onMessage(UUID nodeId, Object msg) { + assert msg instanceof GridDataLoadResponse; + + GridDataLoadResponse res = (GridDataLoadResponse)msg; + + if (log.isDebugEnabled()) + log.debug("Received data load response: " + res); + + Buffer buf = bufMappings.get(nodeId); + + if (buf != null) + buf.onResponse(res); + + else if (log.isDebugEnabled()) + log.debug("Ignoring response since node has left [nodeId=" + nodeId + ", "); + } + }); + + if (log.isDebugEnabled()) + log.debug("Added response listener within topic: " + topic); + + fut = new GridDataLoaderFuture(ctx, this); + + publicFut = new IgniteFutureImpl<>(fut); + } + + /** + * Enters busy lock. + */ + private void enterBusy() { + if (!busyLock.enterBusy()) + throw new IllegalStateException("Data loader has been closed."); + } + + /** + * Leaves busy lock. + */ + private void leaveBusy() { + busyLock.leaveBusy(); + } + + /** {@inheritDoc} */ + @Override public IgniteFuture future() { + return publicFut; + } + + /** + * @return Internal future. + */ + public IgniteInternalFuture internalFuture() { + return fut; + } + + /** {@inheritDoc} */ + @Override public void deployClass(Class depCls) { + this.depCls = depCls; + } + + /** {@inheritDoc} */ + @Override public void updater(Updater updater) { + A.notNull(updater, "updater"); + + this.updater = updater; + } + + /** {@inheritDoc} */ + @Override public boolean allowOverwrite() { + return updater != ISOLATED_UPDATER; + } + + /** {@inheritDoc} */ + @Override public void allowOverwrite(boolean allow) { + if (allow == allowOverwrite()) + return; + + ClusterNode node = F.first(ctx.grid().cluster().forCacheNodes(cacheName).nodes()); + + if (node == null) + throw new IgniteException("Failed to get node for cache: " + cacheName); + + updater = allow ? GridDataLoadCacheUpdaters.individual() : ISOLATED_UPDATER; + } + + /** {@inheritDoc} */ + @Override public boolean skipStore() { + return skipStore; + } + + /** {@inheritDoc} */ + @Override public void skipStore(boolean skipStore) { + this.skipStore = skipStore; + } + + /** {@inheritDoc} */ + @Override @Nullable public String cacheName() { + return cacheName; + } + + /** {@inheritDoc} */ + @Override public int perNodeBufferSize() { + return bufSize; + } + + /** {@inheritDoc} */ + @Override public void perNodeBufferSize(int bufSize) { + A.ensure(bufSize > 0, "bufSize > 0"); + + this.bufSize = bufSize; + } + + /** {@inheritDoc} */ + @Override public int perNodeParallelLoadOperations() { + return parallelOps; + } + + /** {@inheritDoc} */ + @Override public void perNodeParallelLoadOperations(int parallelOps) { + this.parallelOps = parallelOps; + } + + /** {@inheritDoc} */ + @Override public long autoFlushFrequency() { + return autoFlushFreq; + } + + /** {@inheritDoc} */ + @Override public void autoFlushFrequency(long autoFlushFreq) { + A.ensure(autoFlushFreq >= 0, "autoFlushFreq >= 0"); + + long old = this.autoFlushFreq; + + if (autoFlushFreq != old) { + this.autoFlushFreq = autoFlushFreq; + + if (autoFlushFreq != 0 && old == 0) + flushQ.add(this); + else if (autoFlushFreq == 0) + flushQ.remove(this); + } + } + + /** {@inheritDoc} */ + @Override public IgniteFuture addData(Map entries) throws IllegalStateException { + A.notNull(entries, "entries"); + + return addData(entries.entrySet()); + } + + /** {@inheritDoc} */ + @Override public IgniteFuture addData(Collection> entries) { + A.notEmpty(entries, "entries"); + + enterBusy(); + + try { + GridFutureAdapter resFut = new GridFutureAdapter<>(ctx); + + resFut.listenAsync(rmvActiveFut); + + activeFuts.add(resFut); + + Collection keys = null; + + if (entries.size() > 1) { + keys = new GridConcurrentHashSet<>(entries.size(), U.capacity(entries.size()), 1); + + for (Map.Entry entry : entries) + keys.add(entry.getKey()); + } + + load0(entries, resFut, keys, 0); + + return new IgniteFutureImpl<>(resFut); + } + catch (IgniteException e) { + return new IgniteFinishedFutureImpl<>(ctx, e); + } + finally { + leaveBusy(); + } + } + + /** {@inheritDoc} */ + @Override public IgniteFuture addData(Map.Entry entry) { + A.notNull(entry, "entry"); + + return addData(F.asList(entry)); + } + + /** {@inheritDoc} */ + @Override public IgniteFuture addData(K key, V val) { + A.notNull(key, "key"); + + return addData(new Entry0<>(key, val)); + } + + /** {@inheritDoc} */ + @Override public IgniteFuture removeData(K key) { + return addData(key, null); + } + + /** + * @param entries Entries. + * @param resFut Result future. + * @param activeKeys Active keys. + * @param remaps Remaps count. + */ + private void load0( + Collection> entries, + final GridFutureAdapter resFut, + @Nullable final Collection activeKeys, + final int remaps + ) { + assert entries != null; + + Map>> mappings = new HashMap<>(); + + boolean initPda = ctx.deploy().enabled() && jobPda == null; + + for (Map.Entry entry : entries) { + List nodes; + + try { + K key = entry.getKey(); + + assert key != null; + + if (initPda) { + jobPda = new DataLoaderPda(key, entry.getValue(), updater); + + initPda = false; + } + + nodes = nodes(key); + } + catch (IgniteCheckedException e) { + resFut.onDone(e); + + return; + } + + if (F.isEmpty(nodes)) { + resFut.onDone(new ClusterTopologyException("Failed to map key to node " + + "(no nodes with cache found in topology) [infos=" + entries.size() + + ", cacheName=" + cacheName + ']')); + + return; + } + + for (ClusterNode node : nodes) { + Collection> col = mappings.get(node); + + if (col == null) + mappings.put(node, col = new ArrayList<>()); + + col.add(entry); + } + } + + for (final Map.Entry>> e : mappings.entrySet()) { + final UUID nodeId = e.getKey().id(); + + Buffer buf = bufMappings.get(nodeId); + + if (buf == null) { + Buffer old = bufMappings.putIfAbsent(nodeId, buf = new Buffer(e.getKey())); + + if (old != null) + buf = old; + } + + final Collection> entriesForNode = e.getValue(); + + IgniteInClosure> lsnr = new IgniteInClosure>() { + @Override public void apply(IgniteInternalFuture t) { + try { + t.get(); + + if (activeKeys != null) { + for (Map.Entry e : entriesForNode) + activeKeys.remove(e.getKey()); + + if (activeKeys.isEmpty()) + resFut.onDone(); + } + else { + assert entriesForNode.size() == 1; + + // That has been a single key, + // so complete result future right away. + resFut.onDone(); + } + } + catch (IgniteCheckedException e1) { + if (log.isDebugEnabled()) + log.debug("Future finished with error [nodeId=" + nodeId + ", err=" + e1 + ']'); + + if (cancelled) { + resFut.onDone(new IgniteCheckedException("Data loader has been cancelled: " + + IgniteDataStreamerImpl.this, e1)); + } + else if (remaps + 1 > maxRemapCnt) { + resFut.onDone(new IgniteCheckedException("Failed to finish operation (too many remaps): " + + remaps), e1); + } + else + load0(entriesForNode, resFut, activeKeys, remaps + 1); + } + } + }; + + GridFutureAdapter f; + + try { + f = buf.update(entriesForNode, lsnr); + } + catch (IgniteInterruptedCheckedException e1) { + resFut.onDone(e1); + + return; + } + + if (ctx.discovery().node(nodeId) == null) { + if (bufMappings.remove(nodeId, buf)) + buf.onNodeLeft(); + + if (f != null) + f.onDone(new ClusterTopologyCheckedException("Failed to wait for request completion " + + "(node has left): " + nodeId)); + } + } + } + + /** + * @param key Key to map. + * @return Nodes to send requests to. + * @throws IgniteCheckedException If failed. + */ + private List nodes(K key) throws IgniteCheckedException { + GridAffinityProcessor aff = ctx.affinity(); + + return !allowOverwrite() ? aff.mapKeyToPrimaryAndBackups(cacheName, key) : + Collections.singletonList(aff.mapKeyToNode(cacheName, key)); + } + + /** + * Performs flush. + * + * @throws IgniteCheckedException If failed. + */ + private void doFlush() throws IgniteCheckedException { + lastFlushTime = U.currentTimeMillis(); + + List activeFuts0 = null; + + int doneCnt = 0; + + for (IgniteInternalFuture f : activeFuts) { + if (!f.isDone()) { + if (activeFuts0 == null) + activeFuts0 = new ArrayList<>((int)(activeFuts.size() * 1.2)); + + activeFuts0.add(f); + } + else { + f.get(); + + doneCnt++; + } + } + + if (activeFuts0 == null || activeFuts0.isEmpty()) + return; + + while (true) { + Queue> q = null; + + for (Buffer buf : bufMappings.values()) { + IgniteInternalFuture flushFut = buf.flush(); + + if (flushFut != null) { + if (q == null) + q = new ArrayDeque<>(bufMappings.size() * 2); + + q.add(flushFut); + } + } + + if (q != null) { + assert !q.isEmpty(); + + boolean err = false; + + for (IgniteInternalFuture fut = q.poll(); fut != null; fut = q.poll()) { + try { + fut.get(); + } + catch (IgniteCheckedException e) { + if (log.isDebugEnabled()) + log.debug("Failed to flush buffer: " + e); + + err = true; + } + } + + if (err) + // Remaps needed - flush buffers. + continue; + } + + doneCnt = 0; + + for (int i = 0; i < activeFuts0.size(); i++) { + IgniteInternalFuture f = activeFuts0.get(i); + + if (f == null) + doneCnt++; + else if (f.isDone()) { + f.get(); + + doneCnt++; + + activeFuts0.set(i, null); + } + else + break; + } + + if (doneCnt == activeFuts0.size()) + return; + } + } + + /** {@inheritDoc} */ + @SuppressWarnings("ForLoopReplaceableByForEach") + @Override public void flush() throws IgniteException { + enterBusy(); + + try { + doFlush(); + } + catch (IgniteCheckedException e) { + throw U.convertException(e); + } + finally { + leaveBusy(); + } + } + + /** + * Flushes every internal buffer if buffer was flushed before passed in + * threshold. + *

+ * Does not wait for result and does not fail on errors assuming that this method + * should be called periodically. + */ + @Override public void tryFlush() throws IgniteInterruptedException { + if (!busyLock.enterBusy()) + return; + + try { + for (Buffer buf : bufMappings.values()) + buf.flush(); + + lastFlushTime = U.currentTimeMillis(); + } + catch (IgniteInterruptedCheckedException e) { + throw U.convertException(e); + } + finally { + leaveBusy(); + } + } + + /** + * @param cancel {@code True} to close with cancellation. + * @throws IgniteException If failed. + */ + @Override public void close(boolean cancel) throws IgniteException { + try { + closeEx(cancel); + } + catch (IgniteCheckedException e) { + throw U.convertException(e); + } + } + + /** + * @param cancel {@code True} to close with cancellation. + * @throws IgniteCheckedException If failed. + */ + public void closeEx(boolean cancel) throws IgniteCheckedException { + if (!closed.compareAndSet(false, true)) + return; + + busyLock.block(); + + if (log.isDebugEnabled()) + log.debug("Closing data loader [ldr=" + this + ", cancel=" + cancel + ']'); + + IgniteCheckedException e = null; + + try { + // Assuming that no methods are called on this loader after this method is called. + if (cancel) { + cancelled = true; + + for (Buffer buf : bufMappings.values()) + buf.cancelAll(); + } + else + doFlush(); + + ctx.event().removeLocalEventListener(discoLsnr); + + ctx.io().removeMessageListener(topic); + } + catch (IgniteCheckedException e0) { + e = e0; + } + + fut.onDone(null, e); + + if (e != null) + throw e; + } + + /** + * @return {@code true} If the loader is closed. + */ + boolean isClosed() { + return fut.isDone(); + } + + /** {@inheritDoc} */ + @Override public void close() throws IgniteException { + close(false); + } + + /** + * @return Max remap count. + */ + public int maxRemapCount() { + return maxRemapCnt; + } + + /** + * @param maxRemapCnt New max remap count. + */ + public void maxRemapCount(int maxRemapCnt) { + this.maxRemapCnt = maxRemapCnt; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(IgniteDataStreamerImpl.class, this); + } + + /** {@inheritDoc} */ + @Override public long getDelay(TimeUnit unit) { + return unit.convert(nextFlushTime() - U.currentTimeMillis(), TimeUnit.MILLISECONDS); + } + + /** + * @return Next flush time. + */ + private long nextFlushTime() { + return lastFlushTime + autoFlushFreq; + } + + /** {@inheritDoc} */ + @Override public int compareTo(Delayed o) { + return nextFlushTime() > ((IgniteDataStreamerImpl)o).nextFlushTime() ? 1 : -1; + } + + /** + * + */ + private class Buffer { + /** Node. */ + private final ClusterNode node; + + /** Active futures. */ + private final Collection> locFuts; + + /** Buffered entries. */ + private List> entries; + + /** */ + @GridToStringExclude + private GridFutureAdapter curFut; + + /** Local node flag. */ + private final boolean isLocNode; + + /** ID generator. */ + private final AtomicLong idGen = new AtomicLong(); + + /** Active futures. */ + private final ConcurrentMap> reqs; + + /** */ + private final Semaphore sem; + + /** Closure to signal on task finish. */ + @GridToStringExclude + private final IgniteInClosure> signalC = new IgniteInClosure>() { + @Override public void apply(IgniteInternalFuture t) { + signalTaskFinished(t); + } + }; + + /** + * @param node Node. + */ + Buffer(ClusterNode node) { + assert node != null; + + this.node = node; + + locFuts = new GridConcurrentHashSet<>(); + reqs = new ConcurrentHashMap8<>(); + + // Cache local node flag. + isLocNode = node.equals(ctx.discovery().localNode()); + + entries = newEntries(); + curFut = new GridFutureAdapter<>(ctx); + curFut.listenAsync(signalC); + + sem = new Semaphore(parallelOps); + } + + /** + * @param newEntries Infos. + * @param lsnr Listener for the operation future. + * @throws IgniteInterruptedCheckedException If failed. + * @return Future for operation. + */ + @Nullable GridFutureAdapter update(Iterable> newEntries, + IgniteInClosure> lsnr) throws IgniteInterruptedCheckedException { + List> entries0 = null; + GridFutureAdapter curFut0; + + synchronized (this) { + curFut0 = curFut; + + curFut0.listenAsync(lsnr); + + for (Map.Entry entry : newEntries) + entries.add(entry); + + if (entries.size() >= bufSize) { + entries0 = entries; + + entries = newEntries(); + curFut = new GridFutureAdapter<>(ctx); + curFut.listenAsync(signalC); + } + } + + if (entries0 != null) { + submit(entries0, curFut0); + + if (cancelled) + curFut0.onDone(new IgniteCheckedException("Data loader has been cancelled: " + IgniteDataStreamerImpl.this)); + } + + return curFut0; + } + + /** + * @return Fresh collection with some space for outgrowth. + */ + private List> newEntries() { + return new ArrayList<>((int)(bufSize * 1.2)); + } + + /** + * @return Future if any submitted. + * + * @throws IgniteInterruptedCheckedException If thread has been interrupted. + */ + @Nullable IgniteInternalFuture flush() throws IgniteInterruptedCheckedException { + List> entries0 = null; + GridFutureAdapter curFut0 = null; + + synchronized (this) { + if (!entries.isEmpty()) { + entries0 = entries; + curFut0 = curFut; + + entries = newEntries(); + curFut = new GridFutureAdapter<>(ctx); + curFut.listenAsync(signalC); + } + } + + if (entries0 != null) + submit(entries0, curFut0); + + // Create compound future for this flush. + GridCompoundFuture res = null; + + for (IgniteInternalFuture f : locFuts) { + if (res == null) + res = new GridCompoundFuture<>(ctx); + + res.add(f); + } + + for (IgniteInternalFuture f : reqs.values()) { + if (res == null) + res = new GridCompoundFuture<>(ctx); + + res.add(f); + } + + if (res != null) + res.markInitialized(); + + return res; + } + + /** + * Increments active tasks count. + * + * @throws IgniteInterruptedCheckedException If thread has been interrupted. + */ + private void incrementActiveTasks() throws IgniteInterruptedCheckedException { + U.acquire(sem); + } + + /** + * @param f Future that finished. + */ + private void signalTaskFinished(IgniteInternalFuture f) { + assert f != null; + + sem.release(); + } + + /** + * @param entries Entries to submit. + * @param curFut Current future. + * @throws IgniteInterruptedCheckedException If interrupted. + */ + private void submit(final Collection> entries, final GridFutureAdapter curFut) + throws IgniteInterruptedCheckedException { + assert entries != null; + assert !entries.isEmpty(); + assert curFut != null; + + incrementActiveTasks(); + + IgniteInternalFuture fut; + + if (isLocNode) { + fut = ctx.closure().callLocalSafe( + new GridDataLoadUpdateJob<>(ctx, log, cacheName, entries, false, skipStore, updater), false); + + locFuts.add(fut); + + fut.listenAsync(new IgniteInClosure>() { + @Override public void apply(IgniteInternalFuture t) { + try { + boolean rmv = locFuts.remove(t); + + assert rmv; + + curFut.onDone(t.get()); + } + catch (IgniteCheckedException e) { + curFut.onDone(e); + } + } + }); + } + else { + byte[] entriesBytes; + + try { + if (compact) { + entriesBytes = ctx.config().getMarshaller() + .marshal(new Entries0<>(entries, portableEnabled ? ctx.portable() : null)); + } + else + entriesBytes = ctx.config().getMarshaller().marshal(entries); + + if (updaterBytes == null) { + assert updater != null; + + updaterBytes = ctx.config().getMarshaller().marshal(updater); + } + + if (topicBytes == null) + topicBytes = ctx.config().getMarshaller().marshal(topic); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to marshal (request will not be sent).", e); + + return; + } + + GridDeployment dep = null; + GridPeerDeployAware jobPda0 = null; + + if (ctx.deploy().enabled()) { + try { + jobPda0 = jobPda; + + assert jobPda0 != null; + + dep = ctx.deploy().deploy(jobPda0.deployClass(), jobPda0.classLoader()); + + GridCacheAdapter cache = ctx.cache().internalCache(cacheName); + + if (cache != null) + cache.context().deploy().onEnter(); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to deploy class (request will not be sent): " + jobPda0.deployClass(), e); + + return; + } + + if (dep == null) + U.warn(log, "Failed to deploy class (request will be sent): " + jobPda0.deployClass()); + } + + long reqId = idGen.incrementAndGet(); + + fut = curFut; + + reqs.put(reqId, (GridFutureAdapter)fut); + + GridDataLoadRequest req = new GridDataLoadRequest( + reqId, + topicBytes, + cacheName, + updaterBytes, + entriesBytes, + true, + skipStore, + dep != null ? dep.deployMode() : null, + dep != null ? jobPda0.deployClass().getName() : null, + dep != null ? dep.userVersion() : null, + dep != null ? dep.participants() : null, + dep != null ? dep.classLoaderId() : null, + dep == null); + + try { + ctx.io().send(node, TOPIC_DATALOAD, req, PUBLIC_POOL); + + if (log.isDebugEnabled()) + log.debug("Sent request to node [nodeId=" + node.id() + ", req=" + req + ']'); + } + catch (IgniteCheckedException e) { + if (ctx.discovery().alive(node) && ctx.discovery().pingNode(node.id())) + ((GridFutureAdapter)fut).onDone(e); + else + ((GridFutureAdapter)fut).onDone(new ClusterTopologyCheckedException("Failed to send " + + "request (node has left): " + node.id())); + } + } + } + + /** + * + */ + void onNodeLeft() { + assert !isLocNode; + assert bufMappings.get(node.id()) != this; + + if (log.isDebugEnabled()) + log.debug("Forcibly completing futures (node has left): " + node.id()); + + Exception e = new ClusterTopologyCheckedException("Failed to wait for request completion " + + "(node has left): " + node.id()); + + for (GridFutureAdapter f : reqs.values()) + f.onDone(e); + + // Make sure to complete current future. + GridFutureAdapter curFut0; + + synchronized (this) { + curFut0 = curFut; + } + + curFut0.onDone(e); + } + + /** + * @param res Response. + */ + void onResponse(GridDataLoadResponse res) { + if (log.isDebugEnabled()) + log.debug("Received data load response: " + res); + + GridFutureAdapter f = reqs.remove(res.requestId()); + + if (f == null) { + if (log.isDebugEnabled()) + log.debug("Future for request has not been found: " + res.requestId()); + + return; + } + + Throwable err = null; + + byte[] errBytes = res.errorBytes(); + + if (errBytes != null) { + try { + GridPeerDeployAware jobPda0 = jobPda; + + err = ctx.config().getMarshaller().unmarshal( + errBytes, + jobPda0 != null ? jobPda0.classLoader() : U.gridClassLoader()); + } + catch (IgniteCheckedException e) { + f.onDone(null, new IgniteCheckedException("Failed to unmarshal response.", e)); + + return; + } + } + + f.onDone(null, err); + + if (log.isDebugEnabled()) + log.debug("Finished future [fut=" + f + ", reqId=" + res.requestId() + ", err=" + err + ']'); + } + + /** + * + */ + void cancelAll() { + IgniteCheckedException err = new IgniteCheckedException("Data loader has been cancelled: " + IgniteDataStreamerImpl.this); + + for (IgniteInternalFuture f : locFuts) { + try { + f.cancel(); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to cancel mini-future.", e); + } + } + + for (GridFutureAdapter f : reqs.values()) + f.onDone(err); + } + + /** {@inheritDoc} */ + @Override public String toString() { + int size; + + synchronized (this) { + size = entries.size(); + } + + return S.toString(Buffer.class, this, + "entriesCnt", size, + "locFutsSize", locFuts.size(), + "reqsSize", reqs.size()); + } + } + + /** + * Data loader peer-deploy aware. + */ + private class DataLoaderPda implements GridPeerDeployAware { + /** */ + private static final long serialVersionUID = 0L; + + /** Deploy class. */ + private Class cls; + + /** Class loader. */ + private ClassLoader ldr; + + /** Collection of objects to detect deploy class and class loader. */ + private Collection objs; + + /** + * Constructs data loader peer-deploy aware. + * + * @param objs Collection of objects to detect deploy class and class loader. + */ + private DataLoaderPda(Object... objs) { + this.objs = Arrays.asList(objs); + } + + /** {@inheritDoc} */ + @Override public Class deployClass() { + if (cls == null) { + Class cls0 = null; + + if (depCls != null) + cls0 = depCls; + else { + for (Iterator it = objs.iterator(); (cls0 == null || U.isJdk(cls0)) && it.hasNext();) { + Object o = it.next(); + + if (o != null) + cls0 = U.detectClass(o); + } + + if (cls0 == null || U.isJdk(cls0)) + cls0 = IgniteDataStreamerImpl.class; + } + + assert cls0 != null : "Failed to detect deploy class [objs=" + objs + ']'; + + cls = cls0; + } + + return cls; + } + + /** {@inheritDoc} */ + @Override public ClassLoader classLoader() { + if (ldr == null) { + ClassLoader ldr0 = deployClass().getClassLoader(); + + // Safety. + if (ldr0 == null) + ldr0 = U.gridClassLoader(); + + assert ldr0 != null : "Failed to detect classloader [objs=" + objs + ']'; + + ldr = ldr0; + } + + return ldr; + } + } + + /** + * Entry. + */ + private static class Entry0 implements Map.Entry, Externalizable { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private K key; + + /** */ + private V val; + + /** + * @param key Key. + * @param val Value. + */ + private Entry0(K key, @Nullable V val) { + assert key != null; + + this.key = key; + this.val = val; + } + + /** + * For {@link Externalizable}. + */ + @SuppressWarnings("UnusedDeclaration") + public Entry0() { + // No-op. + } + + /** {@inheritDoc} */ + @Override public K getKey() { + return key; + } + + /** {@inheritDoc} */ + @Override public V getValue() { + return val; + } + + /** {@inheritDoc} */ + @Override public V setValue(V val) { + V old = this.val; + + this.val = val; + + return old; + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + out.writeObject(key); + out.writeObject(val); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + key = (K)in.readObject(); + val = (V)in.readObject(); + } + } + + /** + * Wrapper list with special compact serialization of map entries. + */ + private static class Entries0 extends AbstractCollection> implements Externalizable { + /** */ + private static final long serialVersionUID = 0L; + + /** Wrapped delegate. */ + private Collection> delegate; + + /** Optional portable processor for converting values. */ + private GridPortableProcessor portable; + + /** + * @param delegate Delegate. + * @param portable Portable processor. + */ + private Entries0(Collection> delegate, GridPortableProcessor portable) { + this.delegate = delegate; + this.portable = portable; + } + + /** + * For {@link Externalizable}. + */ + public Entries0() { + // No-op. + } + + /** {@inheritDoc} */ + @Override public Iterator> iterator() { + return delegate.iterator(); + } + + /** {@inheritDoc} */ + @Override public int size() { + return delegate.size(); + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + out.writeInt(delegate.size()); + + boolean portableEnabled = portable != null; + + for (Map.Entry entry : delegate) { + if (portableEnabled) { + out.writeObject(portable.marshalToPortable(entry.getKey())); + out.writeObject(portable.marshalToPortable(entry.getValue())); + } + else { + out.writeObject(entry.getKey()); + out.writeObject(entry.getValue()); + } + } + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + int sz = in.readInt(); + + delegate = new ArrayList<>(sz); + + for (int i = 0; i < sz; i++) { + Object k = in.readObject(); + Object v = in.readObject(); + + delegate.add(new Entry0<>((K)k, (V)v)); + } + } + } + + /** + * Isolated updater which only loads entry initial value. + */ + private static class IsolatedUpdater implements Updater { + /** */ + private static final long serialVersionUID = 0L; + + /** {@inheritDoc} */ + @Override public void update(IgniteCache cache, Collection> entries) { + IgniteCacheProxy proxy = (IgniteCacheProxy)cache; + + GridCacheAdapter internalCache = proxy.context().cache(); + + if (internalCache.isNear()) + internalCache = internalCache.context().near().dht(); + + GridCacheContext cctx = internalCache.context(); + + long topVer = cctx.affinity().affinityTopologyVersion(); + + GridCacheVersion ver = cctx.versions().next(topVer); + + boolean portable = cctx.portableEnabled(); + + for (Map.Entry e : entries) { + try { + K key = e.getKey(); + V val = e.getValue(); + + if (portable) { + key = (K)cctx.marshalToPortable(key); + val = (V)cctx.marshalToPortable(val); + } + + GridCacheEntryEx entry = internalCache.entryEx(key, topVer); + + entry.unswap(true, false); + + entry.initialValue(val, null, ver, CU.TTL_ETERNAL, CU.EXPIRE_TIME_ETERNAL, false, topVer, + GridDrType.DR_LOAD); + + cctx.evicts().touch(entry, topVer); + } + catch (GridDhtInvalidPartitionException | GridCacheEntryRemovedException ignored) { + // No-op. + } + catch (IgniteCheckedException ex) { + IgniteLogger log = cache.unwrap(Ignite.class).log(); + + U.error(log, "Failed to set initial value for cache entry: " + e, ex); + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9b33b651/modules/core/src/main/java/org/apache/ignite/internal/processors/dr/GridDrDataLoadCacheUpdater.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/dr/GridDrDataLoadCacheUpdater.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/dr/GridDrDataLoadCacheUpdater.java index b8cfe77..e34eb16 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/dr/GridDrDataLoadCacheUpdater.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/dr/GridDrDataLoadCacheUpdater.java @@ -31,7 +31,7 @@ import java.util.*; /** * Data center replication cache updater for data loader. */ -public class GridDrDataLoadCacheUpdater implements IgniteDataLoader.Updater { +public class GridDrDataLoadCacheUpdater implements IgniteDataStreamer.Updater { /** */ private static final long serialVersionUID = 0L; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9b33b651/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java index e960422..49b78f3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java @@ -302,8 +302,8 @@ public class IgfsDataManager extends IgfsManager { * * @return New instance of data loader. */ - private IgniteDataLoader dataLoader() { - IgniteDataLoader ldr = + private IgniteDataStreamer dataLoader() { + IgniteDataStreamer ldr = igfsCtx.kernalContext().dataLoad().dataLoader(dataCachePrj.name()); IgfsConfiguration cfg = igfsCtx.configuration(); @@ -641,7 +641,7 @@ public class IgfsDataManager extends IgfsManager { ", cleanNonColocated=" + cleanNonColocated + ", startIdx=" + startIdx + ", endIdx=" + endIdx + ']'); try { - try (IgniteDataLoader ldr = dataLoader()) { + try (IgniteDataStreamer ldr = dataLoader()) { for (long idx = startIdx; idx <= endIdx; idx++) { ldr.removeData(new IgfsBlockKey(fileInfo.id(), range.affinityKey(), fileInfo.evictExclude(), idx)); @@ -667,7 +667,7 @@ public class IgfsDataManager extends IgfsManager { long endIdx = range.endOffset() / fileInfo.blockSize(); try { - try (IgniteDataLoader ldr = dataLoader()) { + try (IgniteDataStreamer ldr = dataLoader()) { long bytesProcessed = 0; for (long idx = startIdx; idx <= endIdx; idx++) { @@ -1705,7 +1705,7 @@ public class IgfsDataManager extends IgfsManager { break; } - IgniteDataLoader ldr = dataLoader(); + IgniteDataStreamer ldr = dataLoader(); try { IgfsFileMap map = fileInfo.fileMap(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9b33b651/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxConsistencyRestartAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxConsistencyRestartAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxConsistencyRestartAbstractSelfTest.java index 4f5da0b..b60b76b 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxConsistencyRestartAbstractSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxConsistencyRestartAbstractSelfTest.java @@ -99,7 +99,7 @@ public abstract class IgniteTxConsistencyRestartAbstractSelfTest extends GridCom public void testTxConsistency() throws Exception { startGridsMultiThreaded(GRID_CNT); - IgniteDataLoader ldr = grid(0).dataLoader(null); + IgniteDataStreamer ldr = grid(0).dataLoader(null); for (int i = 0; i < RANGE; i++) { ldr.addData(i, 0); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9b33b651/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedHitsAndMissesSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedHitsAndMissesSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedHitsAndMissesSelfTest.java index 8950f45..414fc42 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedHitsAndMissesSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedHitsAndMissesSelfTest.java @@ -140,7 +140,7 @@ public class GridCachePartitionedHitsAndMissesSelfTest extends GridCommonAbstrac * @param g Grid. */ private static void realTimePopulate(final Ignite g) { - try (IgniteDataLoader ldr = g.dataLoader(null)) { + try (IgniteDataStreamer ldr = g.dataLoader(null)) { // Sets max values to 1 so cache metrics have correct values. ldr.perNodeParallelLoadOperations(1); @@ -155,7 +155,7 @@ public class GridCachePartitionedHitsAndMissesSelfTest extends GridCommonAbstrac /** * Increments value for key. */ - private static class IncrementingUpdater implements IgniteDataLoader.Updater { + private static class IncrementingUpdater implements IgniteDataStreamer.Updater { /** */ private static final EntryProcessor INC = new EntryProcessor() { @Override public Void process(MutableEntry e, Object... args) { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9b33b651/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/eviction/lru/GridCacheLruNearEvictionPolicySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/eviction/lru/GridCacheLruNearEvictionPolicySelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/eviction/lru/GridCacheLruNearEvictionPolicySelfTest.java index ea8f060..ecb2411 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/eviction/lru/GridCacheLruNearEvictionPolicySelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/eviction/lru/GridCacheLruNearEvictionPolicySelfTest.java @@ -108,7 +108,7 @@ public class GridCacheLruNearEvictionPolicySelfTest extends GridCommonAbstractTe info("Inserting " + cnt + " keys to cache."); - try (IgniteDataLoader ldr = grid(0).dataLoader(null)) { + try (IgniteDataStreamer ldr = grid(0).dataLoader(null)) { for (int i = 0; i < cnt; i++) ldr.addData(i, Integer.toString(i)); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9b33b651/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/eviction/lru/GridCacheNearOnlyLruNearEvictionPolicySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/eviction/lru/GridCacheNearOnlyLruNearEvictionPolicySelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/eviction/lru/GridCacheNearOnlyLruNearEvictionPolicySelfTest.java index a381082..5ac6dae 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/eviction/lru/GridCacheNearOnlyLruNearEvictionPolicySelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/eviction/lru/GridCacheNearOnlyLruNearEvictionPolicySelfTest.java @@ -141,7 +141,7 @@ public class GridCacheNearOnlyLruNearEvictionPolicySelfTest extends GridCommonAb info("Inserting " + cnt + " keys to cache."); - try (IgniteDataLoader ldr = grid(1).dataLoader(null)) { + try (IgniteDataStreamer ldr = grid(1).dataLoader(null)) { for (int i = 0; i < cnt; i++) ldr.addData(i, Integer.toString(i)); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9b33b651/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/GridDataLoaderImplSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/GridDataLoaderImplSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/GridDataLoaderImplSelfTest.java index e433856..501e56c 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/GridDataLoaderImplSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/GridDataLoaderImplSelfTest.java @@ -18,7 +18,6 @@ package org.apache.ignite.internal.processors.dataload; import org.apache.ignite.*; -import org.apache.ignite.cache.*; import org.apache.ignite.configuration.*; import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; @@ -88,7 +87,7 @@ public class GridDataLoaderImplSelfTest extends GridCommonAbstractTest { Ignite g4 = grid(4); - IgniteDataLoader dataLdr = g4.dataLoader(null); + IgniteDataStreamer dataLdr = g4.dataLoader(null); dataLdr.perNodeBufferSize(32); @@ -135,7 +134,7 @@ public class GridDataLoaderImplSelfTest extends GridCommonAbstractTest { else fail("Expected GridOptimizedMarshaller, but found: " + marsh.getClass().getName()); - IgniteDataLoader dataLdr = g0.dataLoader(null); + IgniteDataStreamer dataLdr = g0.dataLoader(null); Map map = U.newHashMap(KEYS_COUNT); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9b33b651/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/GridDataLoaderPerformanceTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/GridDataLoaderPerformanceTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/GridDataLoaderPerformanceTest.java index 1945912..fc1a1cb 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/GridDataLoaderPerformanceTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/GridDataLoaderPerformanceTest.java @@ -137,7 +137,7 @@ public class GridDataLoaderPerformanceTest extends GridCommonAbstractTest { Ignite ignite = startGrid(); - final IgniteDataLoader ldr = ignite.dataLoader(null); + final IgniteDataStreamer ldr = ignite.dataLoader(null); ldr.perNodeBufferSize(8192); ldr.updater(GridDataLoadCacheUpdaters.batchedSorted()); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9b33b651/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/GridDataLoaderProcessorSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/GridDataLoaderProcessorSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/GridDataLoaderProcessorSelfTest.java index a666423..5959957 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/GridDataLoaderProcessorSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/GridDataLoaderProcessorSelfTest.java @@ -19,16 +19,15 @@ package org.apache.ignite.internal.processors.dataload; import org.apache.ignite.*; import org.apache.ignite.cache.*; -import org.apache.ignite.cache.GridCache; import org.apache.ignite.cache.affinity.*; import org.apache.ignite.cache.store.*; import org.apache.ignite.cluster.*; import org.apache.ignite.configuration.*; import org.apache.ignite.events.*; import org.apache.ignite.internal.*; -import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.cache.distributed.near.*; +import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.lang.*; import org.apache.ignite.marshaller.optimized.*; import org.apache.ignite.spi.discovery.tcp.*; @@ -37,7 +36,7 @@ import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; import org.apache.ignite.testframework.junits.common.*; import org.jetbrains.annotations.*; -import javax.cache.Cache; +import javax.cache.*; import javax.cache.configuration.*; import java.util.*; import java.util.concurrent.*; @@ -177,7 +176,7 @@ public class GridDataLoaderProcessorSelfTest extends GridCommonAbstractTest { Ignite g2 = startGrid(2); startGrid(3); - final IgniteDataLoader ldr = g1.dataLoader(null); + final IgniteDataStreamer ldr = g1.dataLoader(null); ldr.updater(GridDataLoadCacheUpdaters.batchedSorted()); @@ -219,7 +218,7 @@ public class GridDataLoaderProcessorSelfTest extends GridCommonAbstractTest { assertEquals(total, s2 + s3); - final IgniteDataLoader rmvLdr = g2.dataLoader(null); + final IgniteDataStreamer rmvLdr = g2.dataLoader(null); rmvLdr.updater(GridDataLoadCacheUpdaters.batchedSorted()); @@ -299,7 +298,7 @@ public class GridDataLoaderProcessorSelfTest extends GridCommonAbstractTest { final int cnt = 40_000; final int threads = 10; - try (final IgniteDataLoader ldr = g1.dataLoader(null)) { + try (final IgniteDataStreamer ldr = g1.dataLoader(null)) { final AtomicInteger idxGen = new AtomicInteger(); IgniteInternalFuture f1 = multithreadedAsync(new Callable() { @@ -359,7 +358,7 @@ public class GridDataLoaderProcessorSelfTest extends GridCommonAbstractTest { new byte[] {1}, new boolean[] {true, false}, new char[] {2, 3}, new short[] {3, 4}, new int[] {4, 5}, new long[] {5, 6}, new float[] {6, 7}, new double[] {7, 8}); - IgniteDataLoader dataLdr = g1.dataLoader(null); + IgniteDataStreamer dataLdr = g1.dataLoader(null); for (int i = 0, size = arrays.size(); i < 1000; i++) { Object arr = arrays.get(i % size); @@ -417,7 +416,7 @@ public class GridDataLoaderProcessorSelfTest extends GridCommonAbstractTest { Ignite g1 = grid(1); // Get and configure loader. - final IgniteDataLoader ldr = g1.dataLoader(null); + final IgniteDataStreamer ldr = g1.dataLoader(null); ldr.updater(GridDataLoadCacheUpdaters.individual()); ldr.perNodeBufferSize(2); @@ -519,7 +518,7 @@ public class GridDataLoaderProcessorSelfTest extends GridCommonAbstractTest { try { Ignite g1 = startGrid(1); - IgniteDataLoader ldr = g1.dataLoader(null); + IgniteDataStreamer ldr = g1.dataLoader(null); ldr.close(false); @@ -677,7 +676,7 @@ public class GridDataLoaderProcessorSelfTest extends GridCommonAbstractTest { final IgniteCache c = g.jcache(null); - final IgniteDataLoader ldr = g.dataLoader(null); + final IgniteDataStreamer ldr = g.dataLoader(null); ldr.perNodeBufferSize(10); @@ -729,7 +728,7 @@ public class GridDataLoaderProcessorSelfTest extends GridCommonAbstractTest { IgniteCache c = g.jcache(null); - IgniteDataLoader ldr = g.dataLoader(null); + IgniteDataStreamer ldr = g.dataLoader(null); ldr.perNodeBufferSize(10); @@ -776,7 +775,7 @@ public class GridDataLoaderProcessorSelfTest extends GridCommonAbstractTest { assertTrue(c.localSize() == 0); - IgniteDataLoader ldr = g.dataLoader(null); + IgniteDataStreamer ldr = g.dataLoader(null); ldr.perNodeBufferSize(10); ldr.autoFlushFrequency(3000); @@ -821,7 +820,7 @@ public class GridDataLoaderProcessorSelfTest extends GridCommonAbstractTest { for (int i = 0; i < 1000; i++) storeMap.put(i, i); - try (IgniteDataLoader ldr = ignite.dataLoader(null)) { + try (IgniteDataStreamer ldr = ignite.dataLoader(null)) { ldr.allowOverwrite(true); assertFalse(ldr.skipStore()); @@ -839,7 +838,7 @@ public class GridDataLoaderProcessorSelfTest extends GridCommonAbstractTest { for (int i = 1000; i < 2000; i++) assertEquals(i, storeMap.get(i)); - try (IgniteDataLoader ldr = ignite.dataLoader(null)) { + try (IgniteDataStreamer ldr = ignite.dataLoader(null)) { ldr.allowOverwrite(true); ldr.skipStore(true); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9b33b651/modules/core/src/test/java/org/apache/ignite/loadtests/colocation/GridTestMain.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/colocation/GridTestMain.java b/modules/core/src/test/java/org/apache/ignite/loadtests/colocation/GridTestMain.java index e8ffedf..769f201 100644 --- a/modules/core/src/test/java/org/apache/ignite/loadtests/colocation/GridTestMain.java +++ b/modules/core/src/test/java/org/apache/ignite/loadtests/colocation/GridTestMain.java @@ -180,7 +180,7 @@ public class GridTestMain { ExecutorCompletionService execSvc = new ExecutorCompletionService<>(Executors.newFixedThreadPool(numThreads)); - try (IgniteDataLoader ldr = G.ignite().dataLoader("partitioned")) { + try (IgniteDataStreamer ldr = G.ignite().dataLoader("partitioned")) { for (int i = 0; i < numThreads; i++) { final int threadId = i; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9b33b651/modules/core/src/test/java/org/apache/ignite/loadtests/discovery/GridGcTimeoutTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/discovery/GridGcTimeoutTest.java b/modules/core/src/test/java/org/apache/ignite/loadtests/discovery/GridGcTimeoutTest.java index 8615e10..4ec0d82 100644 --- a/modules/core/src/test/java/org/apache/ignite/loadtests/discovery/GridGcTimeoutTest.java +++ b/modules/core/src/test/java/org/apache/ignite/loadtests/discovery/GridGcTimeoutTest.java @@ -40,7 +40,7 @@ public class GridGcTimeoutTest { public static void main(String[] args) { Ignite g = G.start(U.resolveIgniteUrl(CFG_PATH)); - IgniteDataLoader ldr = g.dataLoader(null); + IgniteDataStreamer ldr = g.dataLoader(null); ldr.perNodeBufferSize(16 * 1024); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9b33b651/modules/core/src/test/java/org/apache/ignite/loadtests/mapper/GridContinuousMapperLoadTest1.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/mapper/GridContinuousMapperLoadTest1.java b/modules/core/src/test/java/org/apache/ignite/loadtests/mapper/GridContinuousMapperLoadTest1.java index 4e6d9b5..50c7eba 100644 --- a/modules/core/src/test/java/org/apache/ignite/loadtests/mapper/GridContinuousMapperLoadTest1.java +++ b/modules/core/src/test/java/org/apache/ignite/loadtests/mapper/GridContinuousMapperLoadTest1.java @@ -33,7 +33,7 @@ public class GridContinuousMapperLoadTest1 { try (Ignite g = G.start("examples/config/example-cache.xml")) { int max = 30000; - IgniteDataLoader ldr = g.dataLoader("replicated"); + IgniteDataStreamer ldr = g.dataLoader("replicated"); for (int i = 0; i < max; i++) ldr.addData(i, new TestObject(i, "Test object: " + i)); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9b33b651/modules/core/src/test/java/org/apache/ignite/loadtests/mapper/GridContinuousMapperLoadTest2.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/mapper/GridContinuousMapperLoadTest2.java b/modules/core/src/test/java/org/apache/ignite/loadtests/mapper/GridContinuousMapperLoadTest2.java index c9ba9ab..c6f364a 100644 --- a/modules/core/src/test/java/org/apache/ignite/loadtests/mapper/GridContinuousMapperLoadTest2.java +++ b/modules/core/src/test/java/org/apache/ignite/loadtests/mapper/GridContinuousMapperLoadTest2.java @@ -57,7 +57,7 @@ public class GridContinuousMapperLoadTest2 { try { int max = 20000; - IgniteDataLoader ldr = g.dataLoader("replicated"); + IgniteDataStreamer ldr = g.dataLoader("replicated"); for (int i = 0; i < max; i++) ldr.addData(i, new TestObject(i, "Test object: " + i)); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9b33b651/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteMock.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteMock.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteMock.java index 58478d3..d4dcf22 100644 --- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteMock.java +++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteMock.java @@ -22,7 +22,7 @@ import org.apache.ignite.cache.affinity.*; import org.apache.ignite.cluster.*; import org.apache.ignite.configuration.*; import org.apache.ignite.lang.*; -import org.apache.ignite.marshaller.Marshaller; +import org.apache.ignite.marshaller.*; import org.apache.ignite.plugin.*; import org.jetbrains.annotations.*; @@ -171,7 +171,7 @@ public class IgniteMock implements Ignite { } /** {@inheritDoc} */ - @Override public IgniteDataLoader dataLoader(@Nullable String cacheName) { + @Override public IgniteDataStreamer dataLoader(@Nullable String cacheName) { return null; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9b33b651/modules/scalar/src/main/scala/org/apache/ignite/scalar/scalar.scala ---------------------------------------------------------------------- diff --git a/modules/scalar/src/main/scala/org/apache/ignite/scalar/scalar.scala b/modules/scalar/src/main/scala/org/apache/ignite/scalar/scalar.scala index 6f5553b..53d6f26 100644 --- a/modules/scalar/src/main/scala/org/apache/ignite/scalar/scalar.scala +++ b/modules/scalar/src/main/scala/org/apache/ignite/scalar/scalar.scala @@ -17,9 +17,6 @@ package org.apache.ignite.scalar -import java.net.URL -import java.util.UUID - import org.apache.ignite._ import org.apache.ignite.cache.query.annotations.{QuerySqlField, QueryTextField} import org.apache.ignite.cluster.ClusterNode @@ -27,6 +24,9 @@ import org.apache.ignite.configuration.IgniteConfiguration import org.apache.ignite.internal.IgniteVersionUtils._ import org.jetbrains.annotations.Nullable +import java.net.URL +import java.util.UUID + import scala.annotation.meta.field /** @@ -294,7 +294,7 @@ object scalar extends ScalarConversions { */ @inline def dataLoader$[K, V]( @Nullable cacheName: String, - bufSize: Int): IgniteDataLoader[K, V] = { + bufSize: Int): IgniteDataStreamer[K, V] = { val dl = ignite$.dataLoader[K, V](cacheName) dl.perNodeBufferSize(bufSize) http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9b33b651/modules/spring/src/main/java/org/apache/ignite/IgniteSpringBean.java ---------------------------------------------------------------------- diff --git a/modules/spring/src/main/java/org/apache/ignite/IgniteSpringBean.java b/modules/spring/src/main/java/org/apache/ignite/IgniteSpringBean.java index 31aa9e5..f9f6e9e 100644 --- a/modules/spring/src/main/java/org/apache/ignite/IgniteSpringBean.java +++ b/modules/spring/src/main/java/org/apache/ignite/IgniteSpringBean.java @@ -247,7 +247,7 @@ public class IgniteSpringBean implements Ignite, DisposableBean, InitializingBea } /** {@inheritDoc} */ - @Override public IgniteDataLoader dataLoader(@Nullable String cacheName) { + @Override public IgniteDataStreamer dataLoader(@Nullable String cacheName) { assert g != null; return g.dataLoader(cacheName); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9b33b651/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteSqlQueryBenchmark.java ---------------------------------------------------------------------- diff --git a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteSqlQueryBenchmark.java b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteSqlQueryBenchmark.java index e26ab9f..b70d618 100644 --- a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteSqlQueryBenchmark.java +++ b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteSqlQueryBenchmark.java @@ -40,7 +40,7 @@ public class IgniteSqlQueryBenchmark extends IgniteCacheAbstractBenchmark { long start = System.nanoTime(); - try (IgniteDataLoader dataLdr = ignite().dataLoader(cache.getName())) { + try (IgniteDataStreamer dataLdr = ignite().dataLoader(cache.getName())) { for (int i = 0; i < args.range() && !Thread.currentThread().isInterrupted(); i++) { dataLdr.addData(i, new Person(i, "firstName" + i, "lastName" + i, i * 1000)); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9b33b651/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteSqlQueryJoinBenchmark.java ---------------------------------------------------------------------- diff --git a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteSqlQueryJoinBenchmark.java b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteSqlQueryJoinBenchmark.java index 4e34d14..f6ba47a 100644 --- a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteSqlQueryJoinBenchmark.java +++ b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteSqlQueryJoinBenchmark.java @@ -39,7 +39,7 @@ public class IgniteSqlQueryJoinBenchmark extends IgniteCacheAbstractBenchmark { long start = System.nanoTime(); - try (IgniteDataLoader dataLdr = ignite().dataLoader(cache.getName())) { + try (IgniteDataStreamer dataLdr = ignite().dataLoader(cache.getName())) { final int orgRange = args.range() / 10; // Populate organizations.