river-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Gregg Wonderly <gr...@wonderly.org>
Subject Re: Expected behavior on the remote service when the local caller is interrupted? Should the remote method be interrupted as well?
Date Mon, 23 Dec 2013 18:45:30 GMT
This is one of the places where a lease could help.  An extension of the existing JERI details
could add a lease into the dispatcher layer so that a constant “I am here” message would
come through to the service.  If the client thread is interrupted it would no longer be pinging/notifying
of it’s interest in the results.  That would allow the service end, to take appropriate
actions.  I think that I’d want the export operation or exporter creation, to include the
setup of a call back that would occur when an client wants something to stop.  I would make
the API include a “correlation-ID”, and I’d have that passed into the call to do work,
and passed into the call back for cancellation.

Gregg

On Dec 20, 2013, at 7:12 AM, Bryan Thompson <bryan@systap.com> wrote:

> Hello, I put together a test to examine what would occur if a local service submits an
RMI (via jeri) to a remote service. This unit test setups up a remote service (in a child
JVM) and then issues an RMI that invokes a {@link Thread#sleep(long)} method on the remote
service. The thread that issues the RMI is then interrupted during the sleep. The exception
when the local thread is interrupted is provided below.
> 
> 
> With reference to the SleepTask below, what I see is on the remote service is:
> 
> 
> WARN : 08:03:48,137 2861      com.bigdata.journal.jini.ha.HAJournalTest.executorService1
com.bigdata.journal.jini.ha.TestHAJournalServer$SleepTask.call(TestHAJournalServer.java:498):
Will sleep: millis=6000
> 
> …
> WARN : 08:03:54,138 8862      com.bigdata.journal.jini.ha.HAJournalTest.executorService1
com.bigdata.journal.jini.ha.TestHAJournalServer$SleepTask.call(TestHAJournalServer.java:501):
Sleep finished normally.
> WARN : 08:03:54,139 8863      com.bigdata.journal.jini.ha.HAJournalTest.executorService1
com.bigdata.journal.jini.ha.TestHAJournalServer$SleepTask.call(TestHAJournalServer.java:508):
Did sleep: millis=6000
> 
> 
> Thus, it appears that the interrupt of the local thread making the RMI does NOT interrupt
the thread that is executing the behavior on the remote service (in this case, Thread.sleep()).
> 
> 
> A snip of this test is below – it depends on our test harness environment, but I could
isolate it to a river only test if desired (this would be easier if I had a pointer to a pattern
for starting and killing the child process that I could use for a river test).  But first
I wanted to see what people's expectations were for the remote service when the RMI is interrupted.
Should there be an attempt to propagate the interrupt to the method on the remote service?
> 
> 
> Thanks,
> 
> Bryan
> 
> 
> java.io.IOException: request I/O interrupted
> 
> java.rmi.UnmarshalException: exception unmarshalling response; nested exception is:
> 
> java.io.IOException: request I/O interrupted
> 
> at net.jini.jeri.BasicInvocationHandler.invokeRemoteMethodOnce(BasicInvocationHandler.java:847)
> 
> at net.jini.jeri.BasicInvocationHandler.invokeRemoteMethod(BasicInvocationHandler.java:659)
> 
> at net.jini.jeri.BasicInvocationHandler.invoke(BasicInvocationHandler.java:528)
> 
> at $Proxy2.submit(Unknown Source)
> 
> at com.bigdata.journal.jini.ha.TestHAJournalServer$1.call(TestHAJournalServer.java:399)
> 
> at com.bigdata.journal.jini.ha.TestHAJournalServer$1.call(TestHAJournalServer.java:1)
> 
> at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:334)
> 
> at java.util.concurrent.FutureTask.run(FutureTask.java:166)
> 
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
> 
> at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:334)
> 
> at java.util.concurrent.FutureTask.run(FutureTask.java:166)
> 
> at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:178)
> 
> at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:292)
> 
> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
> 
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
> 
> at java.lang.Thread.run(Thread.java:722)
> 
> Caused by: java.io.IOException: request I/O interrupted
> 
> at com.sun.jini.jeri.internal.mux.Session$MuxInputStream.read(Session.java:833)
> 
> at net.jini.jeri.connection.ConnectionManager$Outbound$Input.read(ConnectionManager.java:550)
> 
> at net.jini.jeri.BasicObjectEndpoint.executeCall(BasicObjectEndpoint.java:410)
> 
> at net.jini.jeri.BasicInvocationHandler.invokeRemoteMethodOnce(BasicInvocationHandler.java:806)
> 
> ... 15 more
> 
> Caused by: java.lang.InterruptedException
> 
> at java.lang.Object.wait(Native Method)
> 
> at java.lang.Object.wait(Object.java:503)
> 
> at com.sun.jini.jeri.internal.mux.Session$MuxInputStream.read(Session.java:829)
> 
> ... 18 more
> 
> 
>    /**
> 
>     * This test is used to characterize what happens when we interrupt an RMI.
> 
>     * Most methods on the {@link HAGlue} interface are synchronous - they block
> 
>     * while some behavior is executed. This is even true for some methods that
> 
>     * return a {@link Future} in order to avoid overhead associated with the
> 
>     * export of a proxy and DGC thread leaks (since fixed in River).
> 
>     * <p>
> 
>     * This unit test setups up a service and then issues an RMI that invokes a
> 
>     * {@link Thread#sleep(long)} method on the service. The thread that issues
> 
>     * the RMI is then interrupted during the sleep.
> 
>     *
> 
>     * @throws Exception
> 
>     */
> 
>    public void test_interruptRMI() throws Exception {
> 
> 
>        // Start a service.
> 
>        final HAGlue serverA = startA();
> 
> 
>        final AtomicReference<Throwable> localCause = new AtomicReference<Throwable>();
> 
> 
>        final ExecutorService executorService = Executors
> 
>                .newSingleThreadScheduledExecutor(DaemonThreadFactory
> 
>                        .defaultThreadFactory());
> 
> 
> 
>        try {
> 
> 
>            final FutureTask<Void> localFuture = new FutureTask<Void>(
> 
>                    new Callable<Void>() {
> 
> 
>                        @Override
> 
>                        public Void call() throws Exception {
> 
> 
>                            try {
> 
>                                final Future<Void> ft = ((HAGlueTest) serverA)
> 
>                                        .submit(new SleepTask(6000/* ms */),
> 
>                                                false/* asyncFuture */);
> 
> 
>                                return ft.get();
> 
>                            } catch (Throwable t) {
> 
>                                localCause.set(t);
> 
>                                log.error(t, t);
> 
>                                throw new RuntimeException(t);
> 
>                            } finally {
> 
>                                log.warn("Local submit of remote task is done.");
> 
>                            }
> 
>                        }
> 
>                    });
> 
>            /*
> 
>             * Submit task that will execute sleep on A. This task will block
> 
>             * until A finishes its sleep. When we cancel this task, the RMI to
> 
>             * A will be interrupted.
> 
>             */
> 
>            executorService.execute(localFuture);
> 
> 
> 
>            // Wait a bit to ensure that the task was started on A.
> 
>            Thread.sleep(2000/* ms */);
> 
> 
>            // interrupt the local future. will cause interrupt of the RMI.
> 
>            localFuture.cancel(true/*mayInterruptIfRunning*/);
> 
> 
>        } finally {
> 
> 
> 
>            executorService.shutdownNow();
> 
> 
> 
>        }
> 
> 
>        /*
> 
>         * The local root cause of the RMI failure is an InterruptedException.
> 
>         *
> 
>         * Note: There is a data race between when the [localCause] is set and
> 
>         * when we exit the code block above. This is because we are
> 
>         * interrupting the local task and have no means to await the completion
> 
>         * of its error handling routine which sets the [localCause].
> 
>         */
> 
>        {
> 
>            assertCondition(new Runnable() {
> 
>                @Override
> 
>                public void run() {
> 
>                    final Throwable tmp = localCause.get();
> 
>                    assertNotNull(tmp);
> 
>                    assertTrue(InnerCause.isInnerCause(tmp,
> 
>                            InterruptedException.class));
> 
>                }
> 
>            }, 10000/*timeout*/, TimeUnit.MILLISECONDS);
> 
>        }
> 
> 
>        /*
> 
>         * Verify the root cause as observed by A for the interrupt. It should
> 
>         * also be an InterruptedException.
> 
>         *
> 
>         * Note: Again, there is a data race.
> 
>         *
> 
>         * Note: Because we might retry this, we do NOT use the getAndClearXXX()
> 
>         * method to recover the remote exception.
> 
>         */
> 
>        {
> 
>            assertCondition(new Runnable() {
> 
>                @Override
> 
>                public void run() {
> 
>                    Throwable tmp;
> 
>                    try {
> 
>                        tmp = ((HAGlueTest) serverA).getLastRootCause();
> 
>                    } catch (IOException e) {
> 
>                        throw new RuntimeException(e);
> 
>                    }
> 
>                    assertNotNull(tmp);
> 
>                    log.warn("Received non-null lastRootCause=" + tmp, tmp);
> 
>                    assertTrue(InnerCause.isInnerCause(tmp,
> 
>                            InterruptedException.class));
> 
>                }
> 
>            }, 10000/* timeout */, TimeUnit.MILLISECONDS);
> 
>        }
> 
> 
> 
>    }
> 
> 
>    /**
> 
>     * Task sleeps for a specified duration.
> 
>     *
> 
>     * @author <a href="mailto:thompsonbry@users.sourceforge.net">Bryan
> 
>     *         Thompson</a>
> 
>     */
> 
>    private static class SleepTask extends IndexManagerCallable<Void> {
> 
> 
>        private static final long serialVersionUID = 1L;
> 
> 
>        private long millis;
> 
> 
>        SleepTask(final long millis) {
> 
>            this.millis = millis;
> 
>        }
> 
> 
>        @Override
> 
>        public Void call() throws Exception {
> 
>            log.warn("Will sleep: millis=" + millis);
> 
>            try {
> 
>                Thread.sleep(millis);
> 
>                log.warn("Sleep finished normally.");
> 
>            } catch (Throwable t) {
> 
>                log.error("Exception during sleep: "+t, t);
> 
>                ((HAJournalTest) getIndexManager()).getRemoteImpl()
> 
>                        .setLastRootCause(t);
> 
>                throw new RuntimeException(t);
> 
>            } finally {
> 
>                log.warn("Did sleep: millis=" + millis);
> 
>            }
> 
>            return null;
> 
>        }
> 
> 
>    }
> 


Mime
View raw message