ignite-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From John Smith <java.dev....@gmail.com>
Subject cache.getAsync() blocks if cluster is not activated.
Date Wed, 15 Jul 2020 18:57:15 GMT
Hi, testing some failover scenarios etc...

When we call cache.getAsync() and the state of the cluster is not active.
It seems to block.

I implemented a cache repository as follows and using Vertx.io. It seems to
block at the cacheOperation.apply(cache)

So when I call myRepo.get(myKey) which underneath applies the
cache.getAsync() function it blocks.

public class IgniteCacheRepository<K,V> implements CacheRepository<K,V> {
    public final long DEFAULT_OPERATION_TIMEOUT = 1000;
    private final TimeUnit DEFAULT_TIMEOUT_UNIT = TimeUnit.MILLISECONDS;

    private Vertx vertx;
    private IgniteCache<K, V> cache;

    public IgniteCacheRepository(Vertx vertx, IgniteCache<K, V> cache) {
        this.vertx = vertx;
        this.cache = cache;
    }

    @Override
    public Future<Void> put(K key, V value) {
        return executeAsync(cache -> cache.putAsync(key, value),
DEFAULT_OPERATION_TIMEOUT, DEFAULT_TIMEOUT_UNIT);
    }

    @Override
    public Future<V> get(K key) {
        return executeAsync(cache -> cache.getAsync(key),
DEFAULT_OPERATION_TIMEOUT, DEFAULT_TIMEOUT_UNIT);
    }

    @Override
    public <T> Future<T> invoke(K key, EntryProcessor<K, V, T>
processor, Object... arguments) {
        return executeAsync(cache -> cache.invokeAsync(key, processor,
arguments), DEFAULT_OPERATION_TIMEOUT, DEFAULT_TIMEOUT_UNIT);
    }

    @Override
    public <T> T cache() {
        return (T) cache;
    }

    /**
     * Adapt Ignite async operation to vertx futures.
     *
     * @param cacheOperation The ignite operation to execute async.
     * @return The value from the cache operation.
     */
    private <T> Future<T> executeAsync(Function<IgniteCache<K, V>,
IgniteFuture<T>> cacheOperation, long timeout, TimeUnit unit) {
        Future<T> future = Future.future();

        try {
            IgniteFuture<T> value = cacheOperation.apply(cache);

            value.listenAsync(result -> {
                try {
                    future.complete(result.get(timeout, unit));
                } catch(Exception ex) {
                    future.fail(ex);
                }
            }, VertxIgniteExecutorAdapter.getOrCreate(vertx.getOrCreateContext()));
        } catch(Exception ex) {
            // Catch some RuntimeException that can be thrown by Ignite cache.
            future.fail(ex);
        }

        return future;
    }
}

Mime
View raw message