Return-Path: X-Original-To: apmail-ignite-commits-archive@minotaur.apache.org Delivered-To: apmail-ignite-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 3A5101878A for ; Mon, 13 Jul 2015 06:12:23 +0000 (UTC) Received: (qmail 73574 invoked by uid 500); 13 Jul 2015 06:12:23 -0000 Delivered-To: apmail-ignite-commits-archive@ignite.apache.org Received: (qmail 73545 invoked by uid 500); 13 Jul 2015 06:12:23 -0000 Mailing-List: contact commits-help@ignite.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.incubator.apache.org Delivered-To: mailing list commits@ignite.incubator.apache.org Received: (qmail 73535 invoked by uid 99); 13 Jul 2015 06:12:23 -0000 Received: from Unknown (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 13 Jul 2015 06:12:23 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id 88D2BD415C for ; Mon, 13 Jul 2015 06:12:22 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 0.376 X-Spam-Level: X-Spam-Status: No, score=0.376 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-1.405, URIBL_BLOCKED=0.001] autolearn=disabled Received: from mx1-us-east.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id UwhkZxJWRv4w for ; Mon, 13 Jul 2015 06:12:13 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-us-east.apache.org (ASF Mail Server at mx1-us-east.apache.org) with SMTP id D67B343DF7 for ; Mon, 13 Jul 2015 06:12:05 +0000 (UTC) Received: (qmail 71043 invoked by uid 99); 13 Jul 2015 06:12:05 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 13 Jul 2015 06:12:05 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 19F5BDFE12; Mon, 13 Jul 2015 06:12:05 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sboikov@apache.org To: commits@ignite.incubator.apache.org Date: Mon, 13 Jul 2015 06:12:09 -0000 Message-Id: <1f041acef47941ee9507aa6bc890bbb5@git.apache.org> In-Reply-To: <17a58177518c463895f779b89fc4751f@git.apache.org> References: <17a58177518c463895f779b89fc4751f@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [5/9] incubator-ignite git commit: # ignite-648: Implemented. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8218fe6f/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteCacheProcessProxy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteCacheProcessProxy.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteCacheProcessProxy.java new file mode 100644 index 0000000..b15b6ef --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteCacheProcessProxy.java @@ -0,0 +1,602 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.testframework.junits.multijvm; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.cache.query.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.internal.util.future.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.mxbean.*; +import org.jetbrains.annotations.*; + +import javax.cache.*; +import javax.cache.CacheManager; +import javax.cache.configuration.*; +import javax.cache.expiry.*; +import javax.cache.integration.*; +import javax.cache.processor.*; +import java.util.*; +import java.util.concurrent.locks.*; + +/** + * Ignite cache proxy for ignite instance at another JVM. + */ +@SuppressWarnings("TransientFieldInNonSerializableClass") +public class IgniteCacheProcessProxy implements IgniteCache { + /** Compute. */ + private final transient IgniteCompute compute; + + /** Cache name. */ + private final String cacheName; + + /** Grid id. */ + private final UUID gridId; + + /** With async. */ + private final boolean isAsync; + + /** Ignite proxy. */ + private final transient IgniteProcessProxy igniteProxy; + + /** + * @param name Name. + * @param proxy Ignite Process Proxy. + */ + public IgniteCacheProcessProxy(String name, IgniteProcessProxy proxy) { + this(name, false, proxy); + } + + /** + * @param name Name. + * @param async + * @param proxy Ignite Process Proxy. + */ + public IgniteCacheProcessProxy(String name, boolean async, IgniteProcessProxy proxy) { + cacheName = name; + isAsync = async; + gridId = proxy.getId(); + igniteProxy = proxy; + compute = proxy.remoteCompute(); + } + + /** + * Returns cache instance. Method to be called from closure at another JVM. + * + * @return Cache. + */ + private IgniteCache cache() { + IgniteCache cache = Ignition.ignite(gridId).cache(cacheName); + + if (isAsync) + cache = cache.withAsync(); + + return cache; + } + + /** {@inheritDoc} */ + @Override public IgniteCache withAsync() { + return new IgniteCacheProcessProxy<>(cacheName, true, igniteProxy); + } + + /** {@inheritDoc} */ + @Override public boolean isAsync() { + return isAsync; + } + + /** {@inheritDoc} */ + @Override public IgniteFuture future() { + // Return fake future. Future should be called in the same place where operation done. + return new IgniteFinishedFutureImpl<>(); + } + + /** {@inheritDoc} */ + @Override public > C getConfiguration(final Class clazz) { + final Class cl = clazz; + + return (C)compute.call(new IgniteCallable() { + @Override public Object call() throws Exception { + return cache().getConfiguration(cl); + } + }); + } + + /** {@inheritDoc} */ + @Override public Entry randomEntry() { + throw new UnsupportedOperationException("Method should be supported."); + } + + /** {@inheritDoc} */ + @Override public IgniteCache withExpiryPolicy(ExpiryPolicy plc) { + throw new UnsupportedOperationException("Method should be supported."); + } + + /** {@inheritDoc} */ + @Override public IgniteCache withSkipStore() { + throw new UnsupportedOperationException("Method should be supported."); + } + + @Override public IgniteCache withNoRetries() { + throw new UnsupportedOperationException("Method should be supported."); + } + + /** {@inheritDoc} */ + @Override public void loadCache(@Nullable IgniteBiPredicate p, @Nullable Object... args) throws CacheException { + throw new UnsupportedOperationException("Method should be supported."); + } + + /** {@inheritDoc} */ + @Override public void localLoadCache(@Nullable final IgniteBiPredicate p, @Nullable final Object... args) throws CacheException { + final IgniteBiPredicate pCopy = p; + + compute.run(new IgniteRunnable() { + @Override public void run() { + cache().localLoadCache(pCopy, args); + } + }); + } + + /** {@inheritDoc} */ + @Override public V getAndPutIfAbsent(final K key, final V val) throws CacheException { + return (V)compute.call(new IgniteCallable() { + @Override public Object call() throws Exception { + return cache().getAndPutIfAbsent(key, val); + } + }); + } + + /** {@inheritDoc} */ + @Override public Lock lock(K key) { + throw new UnsupportedOperationException("Method should be supported."); + } + + /** {@inheritDoc} */ + @Override public Lock lockAll(Collection keys) { + throw new UnsupportedOperationException("Method should be supported."); + } + + /** {@inheritDoc} */ + @Override public boolean isLocalLocked(final K key, final boolean byCurrThread) { + return compute.call(new IgniteCallable() { + @Override public Boolean call() throws Exception { + return cache().isLocalLocked(key, byCurrThread); + } + }); + } + + /** {@inheritDoc} */ + @Override public QueryCursor query(Query qry) { + throw new UnsupportedOperationException("Method should be supported."); + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override public Iterable> localEntries(final CachePeekMode... peekModes) throws CacheException { + return (Iterable>)compute.call(new IgniteCallable() { + @Override public Object call() throws Exception { + Collection res = new ArrayList<>(); + + for (Entry e : cache().localEntries(peekModes)) + res.add(e); + + return res; + } + }); + } + + /** {@inheritDoc} */ + @Override public QueryMetrics queryMetrics() { + throw new UnsupportedOperationException("Method should be supported."); + } + + /** {@inheritDoc} */ + @Override public void localEvict(final Collection keys) { + compute.run(new IgniteRunnable() { + @Override public void run() { + cache().localEvict(keys); + } + }); + } + + /** {@inheritDoc} */ + @Override public V localPeek(final K key, final CachePeekMode... peekModes) { + return (V)compute.call(new IgniteCallable() { + @Override public Object call() throws Exception { + return cache().localPeek(key, peekModes); + } + }); + } + + /** {@inheritDoc} */ + @Override public void localPromote(Set keys) throws CacheException { + throw new UnsupportedOperationException("Method should be supported."); + } + + /** {@inheritDoc} */ + @Override public int size(final CachePeekMode... peekModes) throws CacheException { + return (int)compute.call(new IgniteCallable() { + @Override public Object call() throws Exception { + return cache().size(peekModes); + } + }); + } + + /** {@inheritDoc} */ + @Override public int localSize(final CachePeekMode... peekModes) { + return (int)compute.call(new IgniteCallable() { + @Override public Object call() throws Exception { + return cache().localSize(peekModes); + } + }); + } + + /** {@inheritDoc} */ + @Override public Map> invokeAll(Map> map, + Object... args) { + throw new UnsupportedOperationException("Method should be supported."); + } + + /** {@inheritDoc} */ + @Override public V get(final K key) { + return (V)compute.call(new IgniteCallable() { + @Override public Object call() throws Exception { + return cache().get(key); + } + }); + } + + /** {@inheritDoc} */ + @Override public Map getAll(final Set keys) { + return (Map)compute.call(new IgniteCallable() { + @Override public Object call() throws Exception { + return cache().getAll(keys); + } + }); + } + + @Override public Map getAllOutTx(final Set keys) { + return (Map)compute.call(new IgniteCallable() { + @Override public Object call() throws Exception { + return cache().getAllOutTx(keys); + } + }); + } + + /** {@inheritDoc} */ + @Override public boolean containsKey(final K key) { + return (boolean)compute.call(new IgniteCallable() { + @Override public Object call() throws Exception { + return cache().containsKey(key); + } + }); + } + + /** {@inheritDoc} */ + @Override public void loadAll(Set keys, boolean replaceExistingValues, CompletionListener completionLsnr) { + throw new UnsupportedOperationException("Oparetion can't be supported automatically."); + } + + /** {@inheritDoc} */ + @Override public boolean containsKeys(final Set keys) { + return (boolean)compute.call(new IgniteCallable() { + @Override public Object call() throws Exception { + return cache().containsKeys(keys); + } + }); + } + + /** {@inheritDoc} */ + @Override public void put(final K key, final V val) {; + compute.run(new IgniteRunnable() { + @Override public void run() { + cache().put(key, val); + } + }); + } + + /** {@inheritDoc} */ + @Override public V getAndPut(final K key, final V val) { + return (V)compute.call(new IgniteCallable() { + @Override public Object call() throws Exception { + return cache().getAndPut(key, val); + } + }); + } + + /** {@inheritDoc} */ + @Override public void putAll(final Map map) { + compute.run(new IgniteRunnable() { + @Override public void run() { + cache().putAll(map); + } + }); + } + + /** {@inheritDoc} */ + @Override public boolean putIfAbsent(final K key, final V val) { + return (boolean)compute.call(new IgniteCallable() { + @Override public Object call() throws Exception { + return cache().putIfAbsent(key, val); + } + }); + } + + /** {@inheritDoc} */ + @Override public boolean remove(final K key) { + return (boolean)compute.call(new IgniteCallable() { + @Override public Object call() throws Exception { + return cache().remove(key); + } + }); + } + + /** {@inheritDoc} */ + @Override public boolean remove(final K key, final V oldVal) { + return (boolean)compute.call(new IgniteCallable() { + @Override public Object call() throws Exception { + return cache().remove(key, oldVal); + } + }); + } + + /** {@inheritDoc} */ + @Override public V getAndRemove(final K key) { + return (V)compute.call(new IgniteCallable() { + @Override public Object call() throws Exception { + return cache().getAndRemove(key); + } + }); + } + + /** {@inheritDoc} */ + @Override public boolean replace(final K key, final V oldVal, final V newVal) { + return (boolean)compute.call(new IgniteCallable() { + @Override public Object call() throws Exception { + return cache().replace(key, oldVal, newVal); + } + }); + } + + /** {@inheritDoc} */ + @Override public boolean replace(final K key, final V val) { + return (boolean)compute.call(new IgniteCallable() { + @Override public Object call() throws Exception { + return cache().replace(key, val); + } + }); + } + + /** {@inheritDoc} */ + @Override public V getAndReplace(final K key, final V val) { + return (V)compute.call(new IgniteCallable() { + @Override public Object call() throws Exception { + return cache().getAndReplace(key, val); + } + }); + } + + /** {@inheritDoc} */ + @Override public void removeAll(final Set keys) { + compute.run(new IgniteRunnable() { + @Override public void run() { + cache().removeAll(keys); + } + }); + } + + /** {@inheritDoc} */ + @Override public void removeAll() { + compute.run(new IgniteRunnable() { + @Override public void run() { + IgniteCache cache = cache(); + + cache.removeAll(); + + if (isAsync) + cache.future().get(); + } + }); + } + + /** {@inheritDoc} */ + @Override public void clear() { + compute.run(new IgniteRunnable() { + @Override public void run() { + cache().clear(); + } + }); + } + + /** {@inheritDoc} */ + @Override public void clear(final K key) { + compute.run(new IgniteRunnable() { + @Override public void run() { + cache().clear(key); + } + }); + } + + /** {@inheritDoc} */ + @Override public void clearAll(final Set keys) { + compute.run(new IgniteRunnable() { + @Override public void run() { + cache().clearAll(keys); + } + }); + } + + /** {@inheritDoc} */ + @Override public void localClear(final K key) { + compute.run(new IgniteRunnable() { + @Override public void run() { + cache().localClear(key); + } + }); + } + + /** {@inheritDoc} */ + @Override public void localClearAll(final Set keys) { + compute.run(new IgniteRunnable() { + @Override public void run() { + cache().localClearAll(keys); + } + }); + } + + /** {@inheritDoc} */ + @Override public T invoke(final K key, final EntryProcessor entryProcessor, final Object... arguments) { + return (T)compute.call(new IgniteCallable() { + @Override public Object call() throws Exception { + return cache().invoke(key, + (EntryProcessor)entryProcessor, arguments); + } + }); + } + + /** {@inheritDoc} */ + @Override public T invoke(final K key, final CacheEntryProcessor entryProcessor, final Object... arguments) { + return (T)compute.call(new IgniteCallable() { + @Override public Object call() throws Exception { + return cache().invoke(key, + (CacheEntryProcessor)entryProcessor, arguments); + } + }); + } + + /** {@inheritDoc} */ + @Override public Map> invokeAll(final Set keys, final EntryProcessor entryProcessor, + final Object... args) { + return (Map>)compute.call(new IgniteCallable() { + @Override public Object call() throws Exception { + return cache().invokeAll(keys, + (EntryProcessor)entryProcessor, args); + } + }); + } + + /** {@inheritDoc} */ + @Override public String getName() { + return (String)compute.call(new IgniteCallable() { + @Override public Object call() throws Exception { + return cache().getName(); + } + }); + } + + /** {@inheritDoc} */ + @Override public CacheManager getCacheManager() { + throw new UnsupportedOperationException("Method should be supported."); + } + + /** {@inheritDoc} */ + @Override public void close() { + compute.run(new IgniteRunnable() { + @Override public void run() { + cache().close(); + } + }); + } + + /** {@inheritDoc} */ + @Override public void destroy() { + compute.run(new IgniteRunnable() { + @Override public void run() { + cache().destroy(); + } + }); + } + + /** {@inheritDoc} */ + @Override public boolean isClosed() { + return (boolean)compute.call(new IgniteCallable() { + @Override public Object call() throws Exception { + return cache().isClosed(); + } + }); + } + + /** {@inheritDoc} */ + @Override public T unwrap(final Class clazz) { + if (Ignite.class.equals(clazz)) + return (T)igniteProxy; + + try { + return (T)compute.call(new IgniteCallable() { + @Override public Object call() throws Exception { + return cache().unwrap(clazz); + } + }); + } + catch (Exception e) { + throw new IllegalArgumentException("Looks like class " + clazz + " is unmarshallable. Exception type:" + e.getClass(), e); + } + } + + /** {@inheritDoc} */ + @Override public void registerCacheEntryListener(CacheEntryListenerConfiguration cacheEntryListenerConfiguration) { + throw new UnsupportedOperationException("Method should be supported."); + } + + /** {@inheritDoc} */ + @Override public void deregisterCacheEntryListener(CacheEntryListenerConfiguration cacheEntryListenerConfiguration) { + throw new UnsupportedOperationException("Method should be supported."); + } + + /** {@inheritDoc} */ + @Override public Iterator> iterator() { + final Collection> col = (Collection>)compute.call(new IgniteCallable() { + @Override public Object call() throws Exception { + Collection res = new ArrayList(); + + for (Object o : cache()) + res.add(o); + + return res; + } + }); + + return col.iterator(); + } + + /** {@inheritDoc} */ + @Override public Map> invokeAll(Set keys, + CacheEntryProcessor entryProcessor, Object... args) { + throw new UnsupportedOperationException("Method should be supported."); + } + + /** {@inheritDoc} */ + @Override public IgniteFuture rebalance() { + throw new UnsupportedOperationException("Method should be supported."); + } + + /** {@inheritDoc} */ + @Override public CacheMetrics metrics() { + throw new UnsupportedOperationException("Method should be supported."); + } + + /** {@inheritDoc} */ + @Override public CacheMetrics metrics(ClusterGroup grp) { + throw new UnsupportedOperationException("Method should be supported."); + } + + /** {@inheritDoc} */ + @Override public CacheMetricsMXBean mxBean() { + throw new UnsupportedOperationException("Method should be supported."); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8218fe6f/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteClusterProcessProxy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteClusterProcessProxy.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteClusterProcessProxy.java new file mode 100644 index 0000000..159c451 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteClusterProcessProxy.java @@ -0,0 +1,320 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.testframework.junits.multijvm; + +import org.apache.ignite.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.internal.cluster.*; +import org.apache.ignite.lang.*; +import org.jetbrains.annotations.*; + +import java.io.*; +import java.util.*; +import java.util.concurrent.*; + +/** + * Proxy class for cluster at another JVM. + */ +@SuppressWarnings("TransientFieldInNonSerializableClass") +public class IgniteClusterProcessProxy implements IgniteClusterEx { + /** Grid id. */ + private final UUID gridId; + + /** Compute. */ + private final transient IgniteCompute compute; + + /** */ + private final IgniteProcessProxy proxy; + + /** + * @param proxy Ignite Proxy. + */ + public IgniteClusterProcessProxy(IgniteProcessProxy proxy) { + this.proxy = proxy; + gridId = proxy.getId(); + compute = proxy.remoteCompute(); + } + + /** + * Returns cluster instance. Method to be called from closure at another JVM. + * + * @return Cache. + */ + private IgniteClusterEx cluster() { + return (IgniteClusterEx)Ignition.ignite(gridId).cluster(); + } + + /** {@inheritDoc} */ + @Override public ClusterGroupEx forSubjectId(final UUID subjId) { + throw new UnsupportedOperationException("Operation is not supported yet."); + } + + /** {@inheritDoc} */ + @Override public ClusterGroup forCacheNodes(@Nullable String cacheName, boolean affNodes, boolean nearNodes, + boolean clientNodes) { + throw new UnsupportedOperationException("Operation is not supported yet."); + } + + /** {@inheritDoc} */ + @Override public ClusterNode localNode() { + return compute.call(new IgniteCallable() { + @Override public ClusterNode call() throws Exception { + return cluster().localNode(); + } + }); + } + + /** {@inheritDoc} */ + @Override public ClusterGroup forLocal() { + throw new UnsupportedOperationException("Operation is not supported yet."); + } + + /** {@inheritDoc} */ + @Override public ConcurrentMap nodeLocalMap() { + throw new UnsupportedOperationException("Operation is not supported yet."); + } + + /** {@inheritDoc} */ + @Override public boolean pingNode(UUID nodeId) { + throw new UnsupportedOperationException("Operation is not supported yet."); + } + + /** {@inheritDoc} */ + @Override public long topologyVersion() { + throw new UnsupportedOperationException("Operation is not supported yet."); + } + + /** {@inheritDoc} */ + @Override public Collection topology(long topVer) throws UnsupportedOperationException { + throw new UnsupportedOperationException("Operation is not supported yet."); + } + + /** {@inheritDoc} */ + @Override public Map> mapKeysToNodes(@Nullable String cacheName, + @Nullable Collection keys) throws IgniteException { + throw new UnsupportedOperationException("Operation is not supported yet."); + } + + /** {@inheritDoc} */ + @Override public ClusterNode mapKeyToNode(@Nullable String cacheName, K key) throws IgniteException { + throw new UnsupportedOperationException("Operation is not supported yet."); + } + + /** {@inheritDoc} */ + @Override public Collection startNodes(File file, boolean restart, int timeout, + int maxConn) throws IgniteException { + throw new UnsupportedOperationException("Operation is not supported yet."); + } + + /** {@inheritDoc} */ + @Override public Collection startNodes(Collection> hosts, + @Nullable Map dflts, boolean restart, int timeout, int maxConn) throws IgniteException { + throw new UnsupportedOperationException("Operation is not supported yet."); + } + + /** {@inheritDoc} */ + @Override public void stopNodes() throws IgniteException { + throw new UnsupportedOperationException("Operation is not supported yet."); + } + + /** {@inheritDoc} */ + @Override public void stopNodes(Collection ids) throws IgniteException { + throw new UnsupportedOperationException("Operation is not supported yet."); + } + + /** {@inheritDoc} */ + @Override public void restartNodes() throws IgniteException { + throw new UnsupportedOperationException("Operation is not supported yet."); + } + + /** {@inheritDoc} */ + @Override public void restartNodes(Collection ids) throws IgniteException { + throw new UnsupportedOperationException("Operation is not supported yet."); + } + + /** {@inheritDoc} */ + @Override public void resetMetrics() { + throw new UnsupportedOperationException("Operation is not supported yet."); + } + + /** {@inheritDoc} */ + @Override public IgniteCluster withAsync() { + throw new UnsupportedOperationException("Operation is not supported yet."); + } + + /** {@inheritDoc} */ + @Override public boolean isAsync() { + throw new UnsupportedOperationException("Operation is not supported yet."); + } + + /** {@inheritDoc} */ + @Override public IgniteFuture future() { + throw new UnsupportedOperationException("Operation is not supported yet."); + } + + /** {@inheritDoc} */ + @Override public Ignite ignite() { + return proxy; + } + + /** {@inheritDoc} */ + @Override public ClusterGroup forNodes(Collection nodes) { + throw new UnsupportedOperationException("Operation is not supported yet."); + } + + /** {@inheritDoc} */ + @Override public ClusterGroup forNode(ClusterNode node, ClusterNode... nodes) { + throw new UnsupportedOperationException("Operation is not supported yet."); + } + + /** {@inheritDoc} */ + @Override public ClusterGroup forOthers(ClusterNode node, ClusterNode... nodes) { + throw new UnsupportedOperationException("Operation is not supported yet."); + } + + /** {@inheritDoc} */ + @Override public ClusterGroup forOthers(ClusterGroup prj) { + throw new UnsupportedOperationException("Operation is not supported yet."); + } + + /** {@inheritDoc} */ + @Override public ClusterGroup forNodeIds(Collection ids) { + throw new UnsupportedOperationException("Operation is not supported yet."); + } + + /** {@inheritDoc} */ + @Override public ClusterGroup forNodeId(UUID id, UUID... ids) { + throw new UnsupportedOperationException("Operation is not supported yet."); + } + + /** {@inheritDoc} */ + @Override public ClusterGroup forPredicate(IgnitePredicate p) { + throw new UnsupportedOperationException("Operation is not supported yet."); + } + + /** {@inheritDoc} */ + @Override public ClusterGroup forAttribute(String name, @Nullable Object val) { + throw new UnsupportedOperationException("Operation is not supported yet."); + } + + /** {@inheritDoc} */ + @Override public ClusterGroup forServers() { + throw new UnsupportedOperationException("Operation is not supported yet."); + } + + /** {@inheritDoc} */ + @Override public ClusterGroup forClients() { + throw new UnsupportedOperationException("Operation is not supported yet."); + } + + /** {@inheritDoc} */ + @Override public ClusterGroup forCacheNodes(String cacheName) { + throw new UnsupportedOperationException("Operation is not supported yet."); + } + + /** {@inheritDoc} */ + @Override public ClusterGroup forDataNodes(String cacheName) { + throw new UnsupportedOperationException("Operation is not supported yet."); + } + + /** {@inheritDoc} */ + @Override public ClusterGroup forClientNodes(String cacheName) { + throw new UnsupportedOperationException("Operation is not supported yet."); + } + + /** {@inheritDoc} */ + @Override public ClusterGroup forRemotes() { + throw new UnsupportedOperationException("Operation is not supported yet."); + } + + /** {@inheritDoc} */ + @Override public ClusterGroup forHost(ClusterNode node) { + throw new UnsupportedOperationException("Operation is not supported yet."); + } + + /** {@inheritDoc} */ + @Override public ClusterGroup forHost(String host, String... hosts) { + throw new UnsupportedOperationException("Operation is not supported yet."); + } + + /** {@inheritDoc} */ + @Override public ClusterGroup forDaemons() { + throw new UnsupportedOperationException("Operation is not supported yet."); + } + + /** {@inheritDoc} */ + @Override public ClusterGroup forRandom() { + throw new UnsupportedOperationException("Operation is not supported yet."); + } + + /** {@inheritDoc} */ + @Override public ClusterGroup forOldest() { + throw new UnsupportedOperationException("Operation is not supported yet."); + } + + /** {@inheritDoc} */ + @Override public ClusterGroup forYoungest() { + throw new UnsupportedOperationException("Operation is not supported yet."); + } + + /** {@inheritDoc} */ + @Override public Collection nodes() { + return compute.call(new IgniteCallable>() { + @Override public Collection call() throws Exception { + return cluster().nodes(); + } + }); + } + + /** {@inheritDoc} */ + @Override public ClusterNode node(final UUID nid) { + return compute.call(new IgniteCallable() { + @Override public ClusterNode call() throws Exception { + return cluster().node(nid); + } + }); + } + + /** {@inheritDoc} */ + @Override public ClusterNode node() { + return compute.call(new IgniteCallable() { + @Override public ClusterNode call() throws Exception { + return cluster().node(); + } + }); + } + + /** {@inheritDoc} */ + @Override public Collection hostNames() { + return compute.call(new IgniteCallable>() { + @Override public Collection call() throws Exception { + return cluster().hostNames(); + } + }); + } + + /** {@inheritDoc} */ + @Override public IgnitePredicate predicate() { + throw new UnsupportedOperationException("Operation is not supported yet."); + } + + /** {@inheritDoc} */ + @Override public ClusterMetrics metrics() throws IgniteException { + throw new UnsupportedOperationException("Operation is not supported yet."); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8218fe6f/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteEventsProcessProxy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteEventsProcessProxy.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteEventsProcessProxy.java new file mode 100644 index 0000000..018aa8d --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteEventsProcessProxy.java @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.testframework.junits.multijvm; + +import org.apache.ignite.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.events.*; +import org.apache.ignite.lang.*; +import org.jetbrains.annotations.*; + +import java.util.*; + +/** + * Ignite events proxy for ignite instance at another JVM. + */ +@SuppressWarnings("TransientFieldInNonSerializableClass") +public class IgniteEventsProcessProxy implements IgniteEvents { + /** Ignite proxy. */ + private final transient IgniteProcessProxy igniteProxy; + + /** Grid id. */ + private final UUID gridId; + + /** + * @param igniteProxy Ignite proxy. + */ + public IgniteEventsProcessProxy(IgniteProcessProxy igniteProxy) { + this.igniteProxy = igniteProxy; + + gridId = igniteProxy.getId(); + } + + /** + * @return Events instance. + */ + private IgniteEvents events() { + return Ignition.ignite(gridId).events(); + } + + /** {@inheritDoc} */ + @Override public ClusterGroup clusterGroup() { + throw new UnsupportedOperationException("Operation isn't supported yet."); + } + + /** {@inheritDoc} */ + @Override public List remoteQuery(IgnitePredicate p, long timeout, + @Nullable int... types) throws IgniteException { + throw new UnsupportedOperationException("Operation isn't supported yet."); + } + + /** {@inheritDoc} */ + @Override public UUID remoteListen(@Nullable IgniteBiPredicate locLsnr, + @Nullable IgnitePredicate rmtFilter, @Nullable int... types) throws IgniteException { + throw new UnsupportedOperationException("Operation isn't supported yet."); + } + + /** {@inheritDoc} */ + @Override public UUID remoteListen(int bufSize, long interval, boolean autoUnsubscribe, + @Nullable IgniteBiPredicate locLsnr, @Nullable IgnitePredicate rmtFilter, + @Nullable int... types) throws IgniteException { + throw new UnsupportedOperationException("Operation isn't supported yet."); + } + + /** {@inheritDoc} */ + @Override public void stopRemoteListen(UUID opId) throws IgniteException { + throw new UnsupportedOperationException("Operation isn't supported yet."); + } + + /** {@inheritDoc} */ + @Override public T waitForLocal(@Nullable IgnitePredicate filter, + @Nullable int... types) throws IgniteException { + throw new UnsupportedOperationException("Operation isn't supported yet."); + } + + /** {@inheritDoc} */ + @Override public Collection localQuery(IgnitePredicate p, @Nullable int... types) { + throw new UnsupportedOperationException("Operation isn't supported yet."); + } + + /** {@inheritDoc} */ + @Override public void recordLocal(Event evt) { + throw new UnsupportedOperationException("Operation isn't supported yet."); + } + + /** {@inheritDoc} */ + @Override public void localListen(final IgnitePredicate lsnr, final int... types) { + igniteProxy.remoteCompute().run(new IgniteRunnable() { + @Override public void run() { + events().localListen(lsnr, types); + } + }); + } + + /** {@inheritDoc} */ + @Override public boolean stopLocalListen(IgnitePredicate lsnr, @Nullable int... types) { + throw new UnsupportedOperationException("Operation isn't supported yet."); + } + + /** {@inheritDoc} */ + @Override public void enableLocal(int... types) { + throw new UnsupportedOperationException("Operation isn't supported yet."); + } + + /** {@inheritDoc} */ + @Override public void disableLocal(int... types) { + throw new UnsupportedOperationException("Operation isn't supported yet."); + } + + /** {@inheritDoc} */ + @Override public int[] enabledEvents() { + throw new UnsupportedOperationException("Operation isn't supported yet."); + } + + /** {@inheritDoc} */ + @Override public boolean isEnabled(int type) { + throw new UnsupportedOperationException("Operation isn't supported yet."); + } + + /** {@inheritDoc} */ + @Override public IgniteEvents withAsync() { + throw new UnsupportedOperationException("Operation isn't supported yet."); + } + + /** {@inheritDoc} */ + @Override public boolean isAsync() { + throw new UnsupportedOperationException("Operation isn't supported yet."); + } + + /** {@inheritDoc} */ + @Override public IgniteFuture future() { + throw new UnsupportedOperationException("Operation isn't supported yet."); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8218fe6f/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteNodeRunner.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteNodeRunner.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteNodeRunner.java new file mode 100644 index 0000000..2703d2b --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteNodeRunner.java @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.testframework.junits.multijvm; + +import com.thoughtworks.xstream.*; +import org.apache.ignite.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.processors.cache.*; +import org.apache.ignite.internal.util.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.marshaller.optimized.*; +import org.apache.ignite.spi.discovery.tcp.*; +import sun.jvmstat.monitor.*; + +import java.io.*; +import java.lang.reflect.*; +import java.util.*; + +/** + * Run ignite node. + */ +public class IgniteNodeRunner { + /** */ + private static final String IGNITE_CONFIGURATION_FILE = System.getProperty("java.io.tmpdir") + + File.separator + "igniteConfiguration.tmp_"; + + /** */ + private static volatile Ignite ignite; + + /** + * Starts {@link Ignite} instance accorging to given arguments. + * + * @param args Arguments. + * @throws Exception If failed. + */ + public static void main(String[] args) throws Exception { + X.println(GridJavaProcess.PID_MSG_PREFIX + U.jvmPid()); + + X.println("Starting Ignite Node... Args=" + Arrays.toString(args)); + + IgniteConfiguration cfg = readCfgFromFileAndDeleteFile(args[0]); + + ignite = Ignition.start(cfg); + } + + /** + * @return Ignite instance started at main. + */ + public static IgniteEx startedInstance(){ + return (IgniteEx)ignite; + } + + /** + * @return True if there is ignite node started via {@link IgniteNodeRunner} at this JVM. + */ + public static boolean hasStartedInstance() { + return ignite != null; + } + + /** + * Stores {@link IgniteConfiguration} to file as xml. + * + * @param cfg Ignite Configuration. + * @return A name of file where the configuration was stored. + * @throws IOException If failed. + * @see #readCfgFromFileAndDeleteFile(String) + */ + public static String storeToFile(IgniteConfiguration cfg) throws IOException { + String fileName = IGNITE_CONFIGURATION_FILE + cfg.getNodeId(); + + // Check marshaller configuration, because read configuration method expect specific marshaller. + if (cfg.getMarshaller() instanceof OptimizedMarshaller){ + OptimizedMarshaller marsh = (OptimizedMarshaller)cfg.getMarshaller(); + + try { + Field isRequireFiled = marsh.getClass().getDeclaredField("requireSer"); + + isRequireFiled.setAccessible(true); + + boolean isRequireSer = isRequireFiled.getBoolean(marsh); + + if (isRequireSer) + throw new UnsupportedOperationException("Unsupported marshaller configuration. " + + "readCfgFromFileAndDeleteFile method expect " + OptimizedMarshaller.class.getSimpleName() + + "with requireSerializeble flag in 'false'."); + } + catch (NoSuchFieldException|IllegalAccessException e) { + throw new IgniteException("Failed to check filed of " + OptimizedMarshaller.class.getSimpleName(), e); + } + } + else + throw new UnsupportedOperationException("Unsupported marshaller. " + + "readCfgFromFileAndDeleteFile method expect " + OptimizedMarshaller.class.getSimpleName()); + + try(OutputStream out = new BufferedOutputStream(new FileOutputStream(fileName))) { + cfg.setMBeanServer(null); + cfg.setMarshaller(null); + cfg.setDiscoverySpi(null); + cfg.setGridLogger(null); + + new XStream().toXML(cfg, out); + } + + return fileName; + } + + /** + * Reads configuration from given file and delete the file after. + * + * @param fileName File name. + * @return Readed configuration. + * @throws IOException If failed. + * @see #storeToFile(IgniteConfiguration) + */ + private static IgniteConfiguration readCfgFromFileAndDeleteFile(String fileName) throws IOException { + try(BufferedReader cfgReader = new BufferedReader(new FileReader(fileName))) { + IgniteConfiguration cfg = (IgniteConfiguration)new XStream().fromXML(cfgReader); + + cfg.setMarshaller(new OptimizedMarshaller(false)); + + TcpDiscoverySpi disco = new TcpDiscoverySpi(); + disco.setIpFinder(GridCacheAbstractFullApiSelfTest.LOCAL_IP_FINDER); + cfg.setDiscoverySpi(disco); + + return cfg; + } + finally { + new File(fileName).delete(); + } + } + + /** + * Kill all Jvm runned by {#link IgniteNodeRunner}. Works based on jps command. + * + * @return List of killed process ids. + * @throws Exception If exception. + */ + public static List killAll() throws Exception{ + MonitoredHost monitoredHost = MonitoredHost.getMonitoredHost(new HostIdentifier("localhost")); + + Set jvms = monitoredHost.activeVms(); + + List res = new ArrayList<>(); + + for (Integer jvmId : jvms) { + try { + MonitoredVm vm = monitoredHost.getMonitoredVm(new VmIdentifier("//" + jvmId + "?mode=r"), 0); + + if (IgniteNodeRunner.class.getName().equals(MonitoredVmUtil.mainClass(vm, true))) { + Process killProc = U.isWindows() ? + Runtime.getRuntime().exec(new String[] {"taskkill", "/pid", jvmId.toString(), "/f", "/t"}) : + Runtime.getRuntime().exec(new String[] {"kill", "-9", jvmId.toString()}); + + killProc.waitFor(); + + res.add(jvmId); + } + } + catch (Exception e) { + // Print stack trace just for information. + X.printerrln("Could not kill IgniteNodeRunner java processes. Jvm pid = " + jvmId, e); + } + } + + return res; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8218fe6f/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteProcessProxy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteProcessProxy.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteProcessProxy.java new file mode 100644 index 0000000..220424a --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteProcessProxy.java @@ -0,0 +1,571 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.testframework.junits.multijvm; + +import org.apache.ignite.*; +import org.apache.ignite.cache.affinity.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.events.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.cluster.*; +import org.apache.ignite.internal.processors.cache.*; +import org.apache.ignite.internal.processors.hadoop.*; +import org.apache.ignite.internal.util.*; +import org.apache.ignite.internal.util.lang.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.plugin.*; +import org.jetbrains.annotations.*; + +import java.util.*; +import java.util.concurrent.*; + +/** + * Ignite proxy for ignite instance at another JVM. + */ +@SuppressWarnings("TransientFieldInNonSerializableClass") +public class IgniteProcessProxy implements IgniteEx { + /** Grid proxies. */ + private static final transient ConcurrentMap gridProxies = new ConcurrentHashMap<>(); + + /** Jvm process with ignite instance. */ + private final transient GridJavaProcess proc; + + /** Configuration. */ + private final transient IgniteConfiguration cfg; + + /** Local JVM grid. */ + private final transient Ignite locJvmGrid; + + /** Logger. */ + private final transient IgniteLogger log; + + /** Grid id. */ + private final UUID id = UUID.randomUUID(); + + /** + * @param cfg Configuration. + * @param log Logger. + * @param locJvmGrid Local JVM grid. + */ + public IgniteProcessProxy(final IgniteConfiguration cfg, final IgniteLogger log, final Ignite locJvmGrid) + throws Exception { + this.cfg = cfg; + this.locJvmGrid = locJvmGrid; + this.log = log.getLogger("jvm-" + id.toString().substring(0, id.toString().indexOf('-'))); + + String cfgFileName = IgniteNodeRunner.storeToFile(cfg.setNodeId(id)); + + List jvmArgs = U.jvmArgs(); + + Collection filteredJvmArgs = new ArrayList<>(); + + for (String arg : jvmArgs) { + if(!arg.toLowerCase().startsWith("-agentlib")) + filteredJvmArgs.add(arg); + } + + final CountDownLatch rmtNodeStartedLatch = new CountDownLatch(1); + + locJvmGrid.events().localListen(new NodeStartedListener(id, rmtNodeStartedLatch), EventType.EVT_NODE_JOINED); + + proc = GridJavaProcess.exec( + IgniteNodeRunner.class, + cfgFileName, // Params. + this.log, + // Optional closure to be called each time wrapped process prints line to system.out or system.err. + new IgniteInClosure() { + @Override public void apply(String s) { + IgniteProcessProxy.this.log.info(s); + } + }, + null, + filteredJvmArgs, // JVM Args. + System.getProperty("surefire.test.class.path") + ); + + assert rmtNodeStartedLatch.await(30, TimeUnit.SECONDS): "Remote node has not joined [id=" + id + ']'; + + IgniteProcessProxy prevVal = gridProxies.putIfAbsent(cfg.getGridName(), this); + + if (prevVal != null) { + remoteCompute().run(new IgniteRunnable() { + @Override public void run() { + G.stop(cfg.getGridName(), true); + } + }); + + throw new IllegalStateException("There was found instance assotiated with " + cfg.getGridName() + + ", instance= " + prevVal + ". New started node was stopped."); + } + } + + /** + */ + private static class NodeStartedListener extends IgnitePredicateX { + /** Id. */ + private final UUID id; + + /** Remote node started latch. */ + private final CountDownLatch rmtNodeStartedLatch; + + /** + * @param id Id. + * @param rmtNodeStartedLatch Remote node started latch. + */ + NodeStartedListener(UUID id, CountDownLatch rmtNodeStartedLatch) { + this.id = id; + this.rmtNodeStartedLatch = rmtNodeStartedLatch; + } + + /** {@inheritDoc} */ + @Override public boolean applyx(Event e) { + if (((DiscoveryEvent)e).eventNode().id().equals(id)) { + rmtNodeStartedLatch.countDown(); + + return false; + } + + return true; + } + } + + /** + * @param gridName Grid name. + * @return Instance by name or exception wiil be thrown. + */ + public static IgniteProcessProxy ignite(String gridName) { + IgniteProcessProxy res = gridProxies.get(gridName); + + if (res == null) + throw new IgniteIllegalStateException("Grid instance was not properly started " + + "or was already stopped: " + gridName + ". All known grid instances: " + gridProxies.keySet()); + + return res; + } + + /** + * @param gridName Grid name. + * @param cancel Cacnel flag. + */ + public static void stop(final String gridName, final boolean cancel) { + IgniteProcessProxy proxy = gridProxies.get(gridName); + + if (proxy != null) { + proxy.remoteCompute().run(new IgniteRunnable() { + @Override public void run() { + G.stop(gridName, cancel); + } + }); + + gridProxies.remove(gridName, proxy); + } + } + + /** + * For usage in closures. + * + * @return Ignite instance. + */ + private Ignite igniteById() { + return Ignition.ignite(id); + } + + /** + * @param locNodeId ID of local node the requested grid instance is managing. + * @return An instance of named grid. This method never returns {@code null}. + * @throws IgniteIllegalStateException Thrown if grid was not properly initialized or grid instance was stopped or + * was not started. + */ + public static Ignite ignite(UUID locNodeId) { + A.notNull(locNodeId, "locNodeId"); + + for (IgniteProcessProxy ignite : gridProxies.values()) { + if (ignite.getId().equals(locNodeId)) + return ignite; + } + + throw new IgniteIllegalStateException("Grid instance with given local node ID was not properly " + + "started or was stopped: " + locNodeId); + } + + /** + * Kill all running processes. + */ + public static void killAll() { + for (IgniteProcessProxy ignite : gridProxies.values()) { + try { + ignite.getProcess().kill(); + } + catch (Exception e) { + U.error(ignite.log, "Killing failed.", e); + } + } + + gridProxies.clear(); + } + + /** + * @return Local JVM grid instance. + */ + public Ignite localJvmGrid() { + return locJvmGrid; + } + + /** + * @return Grid id. + */ + public UUID getId() { + return id; + } + + /** {@inheritDoc} */ + @Override public String name() { + return cfg.getGridName(); + } + + /** {@inheritDoc} */ + @Override public IgniteLogger log() { + return log; + } + + /** {@inheritDoc} */ + @Override public IgniteConfiguration configuration() { + return cfg; + } + + /** {@inheritDoc} */ + @Override public IgniteInternalCache utilityCache() { + throw new UnsupportedOperationException("Operation isn't supported yet."); + } + + /** {@inheritDoc} */ + @Nullable @Override public IgniteInternalCache cachex(@Nullable String name) { + throw new UnsupportedOperationException("Operation isn't supported yet."); + } + + /** {@inheritDoc} */ + @Nullable @Override public IgniteInternalCache cachex() { + throw new UnsupportedOperationException("Operation isn't supported yet."); + } + + /** {@inheritDoc} */ + @Override public Collection> cachesx( + @Nullable IgnitePredicate>... p) { + throw new UnsupportedOperationException("Operation isn't supported yet."); + } + + /** {@inheritDoc} */ + @Override public boolean eventUserRecordable(int type) { + throw new UnsupportedOperationException("Operation isn't supported yet."); + } + + /** {@inheritDoc} */ + @Override public boolean allEventsUserRecordable(int[] types) { + throw new UnsupportedOperationException("Operation isn't supported yet."); + } + + /** {@inheritDoc} */ + @Override public boolean isJmxRemoteEnabled() { + throw new UnsupportedOperationException("Operation isn't supported yet."); + } + + /** {@inheritDoc} */ + @Override public boolean isRestartEnabled() { + throw new UnsupportedOperationException("Operation isn't supported yet."); + } + + /** {@inheritDoc} */ + @Nullable @Override public IgniteFileSystem igfsx(@Nullable String name) { + throw new UnsupportedOperationException("Operation isn't supported yet."); + } + + /** {@inheritDoc} */ + @Override public Hadoop hadoop() { + throw new UnsupportedOperationException("Operation isn't supported yet."); + } + + /** {@inheritDoc} */ + @Override public IgniteClusterEx cluster() { + return new IgniteClusterProcessProxy(this); + } + + /** {@inheritDoc} */ + @Nullable @Override public String latestVersion() { + throw new UnsupportedOperationException("Operation isn't supported yet."); + } + + /** {@inheritDoc} */ + @Override public ClusterNode localNode() { + return remoteCompute().call(new IgniteCallable() { + @Override public ClusterNode call() throws Exception { + return ((IgniteEx)Ignition.ignite(id)).localNode(); + } + }); + } + + /** {@inheritDoc} */ + @Override public GridKernalContext context() { + throw new UnsupportedOperationException("Operation isn't supported yet."); + } + + /** {@inheritDoc} */ + @Override public IgniteCompute compute() { + throw new UnsupportedOperationException("Operation isn't supported yet."); + } + + /** {@inheritDoc} */ + @Override public IgniteCompute compute(ClusterGroup grp) { + throw new UnsupportedOperationException("Operation isn't supported yet."); + } + + /** {@inheritDoc} */ + @Override public IgniteMessaging message() { + throw new UnsupportedOperationException("Operation isn't supported yet."); + } + + /** {@inheritDoc} */ + @Override public IgniteMessaging message(ClusterGroup grp) { + throw new UnsupportedOperationException("Operation isn't supported yet."); + } + + /** {@inheritDoc} */ + @Override public IgniteEvents events() { + return new IgniteEventsProcessProxy(this); + } + + /** {@inheritDoc} */ + @Override public IgniteEvents events(ClusterGroup grp) { + throw new UnsupportedOperationException("Operation isn't supported yet."); + } + + /** {@inheritDoc} */ + @Override public IgniteServices services() { + throw new UnsupportedOperationException("Operation isn't supported yet."); + } + + /** {@inheritDoc} */ + @Override public IgniteServices services(ClusterGroup grp) { + throw new UnsupportedOperationException("Operation isn't supported yet."); + } + + /** {@inheritDoc} */ + @Override public ExecutorService executorService() { + throw new UnsupportedOperationException("Operation isn't supported yet."); + } + + /** {@inheritDoc} */ + @Override public ExecutorService executorService(ClusterGroup grp) { + throw new UnsupportedOperationException("Operation isn't supported yet."); + } + + /** {@inheritDoc} */ + @Override public IgniteProductVersion version() { + throw new UnsupportedOperationException("Operation isn't supported yet."); + } + + /** {@inheritDoc} */ + @Override public IgniteScheduler scheduler() { + throw new UnsupportedOperationException("Operation isn't supported yet."); + } + + /** {@inheritDoc} */ + @Override public IgniteCache createCache(CacheConfiguration cacheCfg) { + throw new UnsupportedOperationException("Operation isn't supported yet."); + } + + /** {@inheritDoc} */ + @Override public IgniteCache createCache(String cacheName) { + throw new UnsupportedOperationException("Operation isn't supported yet."); + } + + /** {@inheritDoc} */ + @Override public IgniteCache getOrCreateCache(CacheConfiguration cacheCfg) { + throw new UnsupportedOperationException("Operation isn't supported yet."); + } + + /** {@inheritDoc} */ + @Override public IgniteCache getOrCreateCache(String cacheName) { + throw new UnsupportedOperationException("Operation isn't supported yet."); + } + + /** {@inheritDoc} */ + @Override public void addCacheConfiguration(CacheConfiguration cacheCfg) { + throw new UnsupportedOperationException("Operation isn't supported yet."); + } + + /** {@inheritDoc} */ + @Override public IgniteCache createCache(CacheConfiguration cacheCfg, + NearCacheConfiguration nearCfg) { + throw new UnsupportedOperationException("Operation isn't supported yet."); + } + + /** {@inheritDoc} */ + @Override public IgniteCache getOrCreateCache(CacheConfiguration cacheCfg, + NearCacheConfiguration nearCfg) { + throw new UnsupportedOperationException("Operation isn't supported yet."); + } + + /** {@inheritDoc} */ + @Override public IgniteCache createNearCache(@Nullable String cacheName, NearCacheConfiguration nearCfg) { + throw new UnsupportedOperationException("Operation isn't supported yet."); + } + + /** {@inheritDoc} */ + @Override public IgniteCache getOrCreateNearCache(@Nullable String cacheName, + NearCacheConfiguration nearCfg) { + throw new UnsupportedOperationException("Operation isn't supported yet."); + } + + /** {@inheritDoc} */ + @Override public void destroyCache(String cacheName) { + throw new UnsupportedOperationException("Operation isn't supported yet."); + } + + /** {@inheritDoc} */ + @Override public IgniteCache cache(@Nullable final String name) { + return new IgniteCacheProcessProxy<>(name, this); + } + + /** {@inheritDoc} */ + @Override public IgniteTransactions transactions() { + throw new UnsupportedOperationException("Transactions can't be supported automatically in multi JVM mode."); + } + + /** {@inheritDoc} */ + @Override public IgniteDataStreamer dataStreamer(@Nullable String cacheName) { + throw new UnsupportedOperationException("Operation isn't supported yet."); + } + + /** {@inheritDoc} */ + @Override public IgniteFileSystem fileSystem(String name) { + throw new UnsupportedOperationException("Operation isn't supported yet."); + } + + /** {@inheritDoc} */ + @Override public Collection fileSystems() { + throw new UnsupportedOperationException("Operation isn't supported yet."); + } + + /** {@inheritDoc} */ + @Override public IgniteAtomicSequence atomicSequence(String name, long initVal, boolean create) throws IgniteException { + throw new UnsupportedOperationException("Operation isn't supported yet."); + } + + /** {@inheritDoc} */ + @Override public IgniteAtomicLong atomicLong(String name, long initVal, boolean create) throws IgniteException { + throw new UnsupportedOperationException("Operation isn't supported yet."); + } + + /** {@inheritDoc} */ + @Override public IgniteAtomicReference atomicReference(String name, @Nullable T initVal, + boolean create) throws IgniteException { + throw new UnsupportedOperationException("Operation isn't supported yet."); + } + + /** {@inheritDoc} */ + @Override public IgniteAtomicStamped atomicStamped(String name, @Nullable T initVal, @Nullable S initStamp, + boolean create) throws IgniteException { + throw new UnsupportedOperationException("Operation isn't supported yet."); + } + + /** {@inheritDoc} */ + @Override public IgniteCountDownLatch countDownLatch(String name, int cnt, boolean autoDel, + boolean create) throws IgniteException { + throw new UnsupportedOperationException("Operation isn't supported yet."); + } + + /** {@inheritDoc} */ + @Override public IgniteQueue queue(String name, int cap, + @Nullable CollectionConfiguration cfg) throws IgniteException { + throw new UnsupportedOperationException("Operation isn't supported yet."); + } + + /** {@inheritDoc} */ + @Override public IgniteSet set(String name, @Nullable CollectionConfiguration cfg) throws IgniteException { + throw new UnsupportedOperationException("Operation isn't supported yet."); + } + + /** {@inheritDoc} */ + @Override public T plugin(String name) throws PluginNotFoundException { + throw new UnsupportedOperationException("Operation isn't supported yet."); + } + + /** {@inheritDoc} */ + @Override public void close() throws IgniteException { + final CountDownLatch rmtNodeStoppedLatch = new CountDownLatch(1); + + locJvmGrid.events().localListen(new IgnitePredicateX() { + @Override public boolean applyx(Event e) { + if (((DiscoveryEvent)e).eventNode().id().equals(id)) { + rmtNodeStoppedLatch.countDown(); + + return false; + } + + return true; + } + }, EventType.EVT_NODE_LEFT, EventType.EVT_NODE_FAILED); + + compute().run(new IgniteRunnable() { + @Override public void run() { + igniteById().close(); + } + }); + + try { + assert U.await(rmtNodeStoppedLatch, 15, TimeUnit.SECONDS) : "NodeId=" + id; + } + catch (IgniteInterruptedCheckedException e) { + throw new IgniteException(e); + } + + try { + getProcess().kill(); + } + catch (Exception e) { + X.printerr("Could not kill process after close.", e); + } + } + + /** {@inheritDoc} */ + @Override public Affinity affinity(String cacheName) { + return new AffinityProcessProxy<>(cacheName, this); + } + + /** + * @return Jvm process in which grid node started. + */ + public GridJavaProcess getProcess() { + return proc; + } + + /** + * @return {@link IgniteCompute} instance to communicate with remote node. + */ + public IgniteCompute remoteCompute() { + ClusterGroup grp = locJvmGrid.cluster().forNodeId(id); + + if (grp.nodes().isEmpty()) + throw new IllegalStateException("Could not found node with id=" + id + "."); + + return locJvmGrid.compute(grp); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8218fe6f/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFullApiMultiJvmSelfTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFullApiMultiJvmSelfTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFullApiMultiJvmSelfTestSuite.java new file mode 100644 index 0000000..272305b --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFullApiMultiJvmSelfTestSuite.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.testsuites; + +import junit.framework.*; +import org.apache.ignite.internal.processors.cache.multijvm.*; + +/** + * Multi-JVM test suite. + */ +public class IgniteCacheFullApiMultiJvmSelfTestSuite extends TestSuite { + /** + * @return Multi-JVM tests suite. + * @throws Exception If failed. + */ + public static TestSuite suite() throws Exception { + TestSuite suite = new TestSuite("Cache Full API Multi Jvm Test Suite"); + + // Multi-node. + suite.addTestSuite(GridCacheReplicatedMultiJvmFullApiSelfTest.class); + suite.addTestSuite(GridCacheReplicatedMultiJvmP2PDisabledFullApiSelfTest.class); + suite.addTestSuite(GridCacheReplicatedAtomicMultiJvmFullApiSelfTest.class); + suite.addTestSuite(GridCacheReplicatedAtomicPrimaryWriteOrderMultiJvmFullApiSelfTest.class); + + suite.addTestSuite(GridCachePartitionedMultiJvmFullApiSelfTest.class); + suite.addTestSuite(GridCachePartitionedCopyOnReadDisabledMultiJvmFullApiSelfTest.class); + suite.addTestSuite(GridCacheAtomicMultiJvmFullApiSelfTest.class); + suite.addTestSuite(GridCacheAtomicCopyOnReadDisabledMultiJvmFullApiSelfTest.class); + suite.addTestSuite(GridCacheAtomicPrimaryWriteOrderMultiJvmFullApiSelfTest.class); + suite.addTestSuite(GridCachePartitionedMultiJvmP2PDisabledFullApiSelfTest.class); + suite.addTestSuite(GridCacheAtomicMultiJvmP2PDisabledFullApiSelfTest.class); + suite.addTestSuite(GridCacheAtomicPrimaryWriteOrderMultiJvmP2PDisabledFullApiSelfTest.class); + suite.addTestSuite(GridCacheAtomicNearEnabledMultiJvmFullApiSelfTest.class); + suite.addTestSuite(GridCacheAtomicNearEnabledPrimaryWriteOrderMultiJvmFullApiSelfTest.class); + + suite.addTestSuite(GridCachePartitionedNearDisabledMultiJvmFullApiSelfTest.class); + suite.addTestSuite(GridCachePartitionedNearDisabledMultiJvmP2PDisabledFullApiSelfTest.class); + + suite.addTestSuite(GridCacheNearOnlyMultiJvmFullApiSelfTest.class); + suite.addTestSuite(GridCacheNearOnlyMultiJvmP2PDisabledFullApiSelfTest.class); + suite.addTestSuite(GridCacheReplicatedNearOnlyMultiJvmFullApiSelfTest.class); + + suite.addTestSuite(GridCacheAtomicClientOnlyMultiJvmFullApiSelfTest.class); + suite.addTestSuite(GridCacheAtomicClientOnlyMultiJvmP2PDisabledFullApiSelfTest.class); + + suite.addTestSuite(GridCacheAtomicNearOnlyMultiJvmFullApiSelfTest.class); + suite.addTestSuite(GridCacheAtomicNearOnlyMultiJvmP2PDisabledFullApiSelfTest.class); + + suite.addTestSuite(GridCachePartitionedFairAffinityMultiJvmFullApiSelfTest.class); + suite.addTestSuite(GridCachePartitionedNearDisabledFairAffinityMultiJvmFullApiSelfTest.class); + suite.addTestSuite(GridCacheAtomicFairAffinityMultiJvmFullApiSelfTest.class); + suite.addTestSuite(GridCacheAtomicNearEnabledFairAffinityMultiJvmFullApiSelfTest.class); + suite.addTestSuite(GridCacheAtomicPrimaryWriteOrderFairAffinityMultiJvmFullApiSelfTest.class); + suite.addTestSuite(GridCacheNearOnlyFairAffinityMultiJvmFullApiSelfTest.class); + suite.addTestSuite(GridCacheAtomicClientOnlyFairAffinityMultiJvmFullApiSelfTest.class); + + // Multi-node with off-heap values. + suite.addTestSuite(GridCacheReplicatedOffHeapMultiJvmFullApiSelfTest.class); + suite.addTestSuite(GridCachePartitionedOffHeapMultiJvmFullApiSelfTest.class); + suite.addTestSuite(GridCacheAtomicOffHeapMultiJvmFullApiSelfTest.class); + suite.addTestSuite(GridCacheAtomicPrimaryWrityOrderOffHeapMultiJvmFullApiSelfTest.class); + suite.addTestSuite(GridCachePartitionedNearDisabledOffHeapMultiJvmFullApiSelfTest.class); + + // Multi-node with off-heap tiered mode. + suite.addTestSuite(GridCacheReplicatedOffHeapTieredMultiJvmFullApiSelfTest.class); + suite.addTestSuite(GridCachePartitionedOffHeapTieredMultiJvmFullApiSelfTest.class); + suite.addTestSuite(GridCacheAtomicOffHeapTieredMultiJvmFullApiSelfTest.class); + suite.addTestSuite(GridCacheAtomicPrimaryWrityOrderOffHeapTieredMultiJvmFullApiSelfTest.class); + suite.addTestSuite(GridCachePartitionedNearDisabledOffHeapTieredMultiJvmFullApiSelfTest.class); + suite.addTestSuite(GridCachePartitionedNearDisabledAtomicOffHeapTieredMultiJvmFullApiSelfTest.class); + + return suite; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8218fe6f/parent/pom.xml ---------------------------------------------------------------------- diff --git a/parent/pom.xml b/parent/pom.xml index bfa9f62..dbaa330 100644 --- a/parent/pom.xml +++ b/parent/pom.xml @@ -720,5 +720,45 @@ + + + tools.jar-default + + + + ${java.home}/../lib/tools.jar + + + + + + com.sun + tools + 1.4.2 + system + ${java.home}/../lib/tools.jar + + + + + + tools.jar-mac + + + + ${java.home}/../Classes/classes.jar + + + + + + com.sun + tools + 1.4.2 + system + ${java.home}/../Classes/classes.jar + + + http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8218fe6f/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index b47d34b..9d1b7d9 100644 --- a/pom.xml +++ b/pom.xml @@ -678,7 +678,6 @@ - schema-import