ignite-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Yakov Zhdanov (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (IGNITE-3412) Client instance hangs on close
Date Fri, 29 Jul 2016 09:08:20 GMT

    [ https://issues.apache.org/jira/browse/IGNITE-3412?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15399009#comment-15399009
] 

Yakov Zhdanov commented on IGNITE-3412:
---------------------------------------

Guys this looks really weird.

Can you please try this class with debug output?

{noformat}
    /**
     *
     */
    private class SocketWriter extends IgniteSpiThread {
        /** */
        private final Object mux = new Object();

        /** */
        private Socket sock;

        /** */
        private boolean clientAck;

        /** */
        private final Queue<TcpDiscoveryAbstractMessage> queue = new ArrayDeque<>();

        /** */
        private final long socketTimeout;

        /** */
        private TcpDiscoveryAbstractMessage unackedMsg;

        /**
         *
         */
        protected SocketWriter() {
            super(spi.ignite().name(), "tcp-client-disco-sock-writer", log);

            socketTimeout = spi.failureDetectionTimeoutEnabled() ? spi.failureDetectionTimeout()
:
                spi.getSocketTimeout();
        }

        /**
         * @param msg Message.
         */
        private void sendMessage(TcpDiscoveryAbstractMessage msg) {
            synchronized (mux) {
                queue.add(msg);

                mux.notifyAll();
            }
        }

        /**
         * @param sock Socket.
         * @param clientAck {@code True} is server supports client message acknowlede.
         */
        private void setSocket(Socket sock, boolean clientAck) {
            synchronized (mux) {
                this.sock = sock;

                this.clientAck = clientAck;

                unackedMsg = null;

                mux.notifyAll();
            }
        }

        /**
         * @return {@code True} if connection is alive.
         */
        public boolean isOnline() {
            synchronized (mux) {
                return sock != null;
            }
        }

        /**
         * @param res Acknowledge response.
         */
        void ackReceived(TcpDiscoveryClientAckResponse res) {
            synchronized (mux) {
                if (unackedMsg != null) {
                    assert unackedMsg.id().equals(res.messageId()) : unackedMsg;

                    unackedMsg = null;
                }

                mux.notifyAll();
            }
        }

        /** {@inheritDoc} */
        @Override public void interrupt() {
            super.interrupt();

            U.debug(log, "SocketWriter has been interrupted: " + System.identityHashCode(this));
        }

        /** {@inheritDoc} */
        @Override protected void body() throws InterruptedException {
            TcpDiscoveryAbstractMessage msg = null;

            while (!Thread.currentThread().isInterrupted()) {
                Socket sock;

                U.debug(log, "1 SocketWriter thread interrupted status " +
                    "[status=" + Thread.currentThread().isInterrupted() +
                    ", hash=" + System.identityHashCode(this) + ']');

                synchronized (mux) {
                    sock = this.sock;

                    if (sock == null) {
                        mux.wait();

                        continue;
                    }

                    if (msg == null)
                        msg = queue.poll();

                    if (msg == null) {
                        mux.wait();

                        continue;
                    }
                }

                U.debug(log, "2 SocketWriter thread interrupted status " +
                    "[status=" + Thread.currentThread().isInterrupted() +
                    ", hash=" + System.identityHashCode(this) + ']');

                for (IgniteInClosure<TcpDiscoveryAbstractMessage> msgLsnr : spi.sndMsgLsnrs)
                    msgLsnr.apply(msg);

                boolean ack = clientAck && !(msg instanceof TcpDiscoveryPingResponse);

                try {
                    if (ack) {
                        synchronized (mux) {
                            assert unackedMsg == null : "Unacked=" + unackedMsg + ", received="
+ msg;

                            unackedMsg = msg;
                        }
                    }

                    U.debug(log, "3 SocketWriter thread interrupted status " +
                        "[status=" + Thread.currentThread().isInterrupted() +
                        ", hash=" + System.identityHashCode(this) + ']');

                    spi.writeToSocket(
                        sock,
                        msg,
                        socketTimeout);

                    msg = null;

                    U.debug(log, "4 SocketWriter thread interrupted status " +
                        "[status=" + Thread.currentThread().isInterrupted() +
                        ", hash=" + System.identityHashCode(this) + ']');


                    if (ack) {
                        long waitEnd = U.currentTimeMillis() + (spi.failureDetectionTimeoutEnabled()
?
                            spi.failureDetectionTimeout() : spi.getAckTimeout());

                        TcpDiscoveryAbstractMessage unacked;

                        synchronized (mux) {
                            while (unackedMsg != null && U.currentTimeMillis() <
waitEnd)
                                mux.wait(waitEnd);

                            unacked = unackedMsg;

                            unackedMsg = null;
                        }

                        U.debug(log, "5 SocketWriter thread interrupted status " +
                            "[status=" + Thread.currentThread().isInterrupted() +
                            ", hash=" + System.identityHashCode(this) + ']');


                        if (unacked != null) {
                            if (log.isDebugEnabled())
                                log.debug("Failed to get acknowledge for message, will try
to reconnect " +
                                "[msg=" + unacked +
                                (spi.failureDetectionTimeoutEnabled() ?
                                ", failureDetectionTimeout=" + spi.failureDetectionTimeout()
:
                                ", timeout=" + spi.getAckTimeout()) + ']');

                            throw new IOException("Failed to get acknowledge for message:
" + unacked);
                        }
                    }
                }
                catch (IOException e) {
                    log.error("DEBUG error thrown.", e); // TODO

                    if (log.isDebugEnabled())
                        U.error(log, "Failed to send node left message (will stop anyway)
" +
                            "[sock=" + sock + ", msg=" + msg + ']', e);

                    U.debug(log, "6 SocketWriter thread interrupted status " +
                        "[status=" + Thread.currentThread().isInterrupted() +
                        ", hash=" + System.identityHashCode(this) + ']');

                    U.closeQuiet(sock);

                    U.debug(log, "7 SocketWriter thread interrupted status " +
                        "[status=" + Thread.currentThread().isInterrupted() +
                        ", hash=" + System.identityHashCode(this) + ']');

                    synchronized (mux) {
                        if (sock == this.sock)
                            this.sock = null; // Connection has dead.
                    }
                }
                catch (IgniteCheckedException e) {
                    log.error("DEBUG Ignite error thrown.", e); // TODO

                    U.error(log, "Failed to send message: " + msg, e);

                    U.debug(log, "8 SocketWriter thread interrupted status " +
                        "[status=" + Thread.currentThread().isInterrupted() +
                        ", hash=" + System.identityHashCode(this) + ']');

                    msg = null;
                }
            }
        }
    }

{noformat}

> Client instance hangs on close
> ------------------------------
>
>                 Key: IGNITE-3412
>                 URL: https://issues.apache.org/jira/browse/IGNITE-3412
>             Project: Ignite
>          Issue Type: Bug
>    Affects Versions: 1.6
>            Reporter: Alexei Scherbakov
>            Assignee: Alexei Scherbakov
>             Fix For: 1.7
>
>         Attachments: SocketsTest.zip, threadDump.txt
>
>
> In some cases calling close on Ignite client instance will lead to deadlock.
> The deadlock happens because of the following.
> Socket writer is waiting for new messages.
> {code}
> "tcp-client-disco-sock-writer-#2%null%" #100 prio=6 os_prio=0 tid=0x000000005fad2800
nid=0x13bc in Object.wait() [0x0000000067d0e000]
>    java.lang.Thread.State: WAITING (on object monitor)
> 	at java.lang.Object.wait(Native Method)
> 	at java.lang.Object.wait(Object.java:502)
> 	at org.apache.ignite.spi.discovery.tcp.ClientImpl$SocketWriter.body(ClientImpl.java:1051)
> 	- locked <0x00000000863da2f8> (a java.lang.Object)
> 	at org.apache.ignite.spi.IgniteSpiThread.run(IgniteSpiThread.java:62)
> {code}
> The closing process is hanging because TcpDiscoverySPI waits while this writer is terminated
> {code}
> "Thread-6" #29 prio=6 os_prio=0 tid=0x000000005a740000 nid=0x17e8 in Object.wait() [0x000000006077e000]
>    java.lang.Thread.State: WAITING (on object monitor)
> 	at java.lang.Object.wait(Native Method)
> 	at java.lang.Thread.join(Thread.java:1245)
> 	- locked <0x00000000863da010> (a org.apache.ignite.spi.discovery.tcp.ClientImpl$SocketWriter)
> 	at java.lang.Thread.join(Thread.java:1319)
> 	at org.apache.ignite.internal.util.IgniteUtils.join(IgniteUtils.java:4476)
> 	at org.apache.ignite.spi.discovery.tcp.ClientImpl.spiStop(ClientImpl.java:295)
> 	at org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi.spiStop(TcpDiscoverySpi.java:1905)
> 	at org.apache.ignite.internal.managers.GridManagerAdapter.stopSpi(GridManagerAdapter.java:325)
> 	at org.apache.ignite.internal.managers.discovery.GridDiscoveryManager.stop(GridDiscoveryManager.java:1336)
> 	at org.apache.ignite.internal.IgniteKernal.stop0(IgniteKernal.java:1940)
> 	at org.apache.ignite.internal.IgniteKernal.stop(IgniteKernal.java:1812)
> 	at org.apache.ignite.internal.IgnitionEx$IgniteNamedInstance.stop0(IgnitionEx.java:2248)
> 	- locked <0x00000000858e77a8> (a org.apache.ignite.internal.IgnitionEx$IgniteNamedInstance)
> 	at org.apache.ignite.internal.IgnitionEx$IgniteNamedInstance.stop(IgnitionEx.java:2211)
> 	at org.apache.ignite.internal.IgnitionEx.stop(IgnitionEx.java:322)
> 	at org.apache.ignite.Ignition.stop(Ignition.java:224)
> 	at org.apache.ignite.internal.IgniteKernal.close(IgniteKernal.java:2921)
> 	at ru.sbrf.ggcod.loader.job.MainLoader.run(MainLoader.java:123)
> 	at java.lang.Thread.run(Thread.java:745)
> {code}
> There is some raise that led to the situation when the writer is hanging on {{Object.wait}}
method ignoring interrupted flag that was set at some point of time.
> The full thread dump is attached.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message