brooklyn-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (BROOKLYN-440) More efficient thread usage for ssh execution
Date Tue, 13 Jun 2017 12:38:00 GMT

    [ https://issues.apache.org/jira/browse/BROOKLYN-440?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16047812#comment-16047812
] 

ASF GitHub Bot commented on BROOKLYN-440:
-----------------------------------------

Github user geomacy commented on a diff in the pull request:

    https://github.com/apache/brooklyn-server/pull/731#discussion_r121662464
  
    --- Diff: utils/common/src/main/java/org/apache/brooklyn/util/stream/LoggingOutputStream.java
---
    @@ -0,0 +1,134 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.brooklyn.util.stream;
    +
    +import java.io.FilterOutputStream;
    +import java.io.IOException;
    +import java.io.OutputStream;
    +import java.util.concurrent.atomic.AtomicBoolean;
    +
    +import org.slf4j.Logger;
    +
    +/**
    + * Wraps another output stream, intercepting the writes to log it.
    + */
    +public class LoggingOutputStream extends FilterOutputStream {
    +
    +    private static final OutputStream NOOP_OUTPUT_STREAM = new FilterOutputStream(null)
{
    +        @Override public void write(int b) throws IOException {
    +        }
    +        @Override public void flush() throws IOException {
    +        }
    +        @Override public void close() throws IOException {
    +        }        
    +    };
    +    
    +    public static Builder builder() {
    +        return new Builder();
    +    }
    +    
    +    public static class Builder {
    +        OutputStream out;
    +        Logger log;
    +        String logPrefix;
    +        
    +        public Builder outputStream(OutputStream val) {
    +            this.out = val;
    +            return this;
    +        }
    +        public Builder logger(Logger val) {
    +            this.log = val;
    +            return this;
    +        }
    +        public Builder logPrefix(String val) {
    +            this.logPrefix = val;
    +            return this;
    +        }
    +        public LoggingOutputStream build() {
    +            return new LoggingOutputStream(this);
    +        }
    +    }
    +    
    +    protected final Logger log;
    +    protected final String logPrefix;
    +    private final AtomicBoolean running = new AtomicBoolean(true);
    +    private final StringBuilder lineSoFar = new StringBuilder(16);
    +
    +    private LoggingOutputStream(Builder builder) {
    +        super(builder.out != null ? builder.out : NOOP_OUTPUT_STREAM);
    +        log = builder.log;
    +        logPrefix = (builder.logPrefix != null) ? builder.logPrefix : "";
    +      }
    +
    +    @Override
    +    public void write(int b) throws IOException {
    +        if (running.get() && b >= 0) onChar(b);
    --- End diff --
    
    the `b >= 0` here, plus the use of `StringBuilder` and `lineSoFar.append((char)c);`
in `onChar`, will make things work wrongly for unicode, e.g. 
    
    ```
        @Test
        public void testLogsUnicode() throws Exception {
            LoggingOutputStream out = LoggingOutputStream.builder().logger(mockLogger).build();
            String test = "Лорем.";
            out.write("Лорем.\n".getBytes(StandardCharsets.UTF_8));
            out.flush();
    
            assertEquals(logs, ImmutableList.of(test));
        }
    ```
    fails with `java.lang.AssertionError: Lists differ at element [0]: Лорем. != . expected
[Лорем.] but found [.]
    `.
    
    We could drop the `b >= 0` and use a list of bytes for `lineSoFar` in order to support
Unicode if you felt that was valuable.


> More efficient thread usage for ssh execution
> ---------------------------------------------
>
>                 Key: BROOKLYN-440
>                 URL: https://issues.apache.org/jira/browse/BROOKLYN-440
>             Project: Brooklyn
>          Issue Type: Improvement
>    Affects Versions: 0.10.0
>            Reporter: Aled Sage
>            Priority: Minor
>
> For consuming the stdout/stderr from ssh execution, our use of {{PipedInputStream}}/{{PipedOutputStream}}
and {{StreamGobbler}} looks very inefficient (my fault - I wrote it originally!)
> We normally consume 6 threads per ssh execution:
> 1. The calling thread of {{SshMachineLocation.execScript}} is blocker, waiting for it
to complete.
> 2. Within sshj, there is a "sftp reader" thread that reads the packets
> 3. Within {{SshjTool.ExecAction}}, we create a {{StreamGobbler}} thread to read the ssh
stdout, as it is made available.
> 4. Same for stderr.
> 5. Within {{ExecWithLoggingHelpers.execWithLogging}}, we create a {{StreamGobber}} thread
to read + log the ssh stdout, as it is made available.
> 6. Same for stderr.
> I'm pretty sure we can get rid of threads 5 and 6; not sure about 3 and 4 though.
> Here is the chain of actions:
> * We call something like {{sshMachineLocation.execScript}}. This can include "out" and
"err" config, to obtain the exec stdout/stderr.
> * {{SshMachineLocation}} wraps the command execution in {{ExecWithLoggingHelpers.execWithLogging}}.
> * In order to log the stdout (and same for stderr):
>   * It creates a {{PipedOutputStream}} and {{PipedInputStream}}. It sets the {{PipedOutputStream}}
as the stdout to use (i.e. config passed to {{SshjTool}})
>   * It creates a {{StreamGobbler}}, which is a thread that consumes the {{PipedInputStream}}
- this logs each line, and also writes each line to the original "out".
> * Within {{SshjTool.ExecAction}}, it has an sshj {{Session.Command.getInputStream()}}
and {{.getErrorStream()}} for reading the stdout and stderr.
>   It creates a {{StreamGobbler}}, which is a thread to consume these input streams; it
writes the bytes received from that input stream to the {{out}} and {{err}} streams passed
in.
> For the logging, a simpler and more efficient approach would be to wrap the OutputStream.
See {{com.google.common.io.CountingOutputStream}} for inspiration. It should be as simple
as extending {{java.util.FilterOutputStream}}, and overriding the {{write}} and {{close}}
methods. The implementation of these methods would call the wrapped outputStream as well as
doing some thing very similar to {{StreamGobbler.onChar()}}.
> I don't see a way to do the same trick inside {{SshjTool.ExecAction}}, unfortunately,
without changes to sshj to wrap {{ChannelInputStream}} which is created inside {{ net.schmizz.sshj.connection.channel.AbstractChannel}}
's constructor (or to override {{AbstractChannel.receiveInto}} perhaps). None of those seem
worth it.
> Below is a trimmed down jstack from executing {{sleep 100}} over ssh:
> {noformat}
> 2017-02-13 11:09:35
> Full thread dump Java HotSpot(TM) 64-Bit Server VM (24.80-b11 mixed mode):
> "main" prio=5 tid=0x00007f8ba180a000 nid=0x1c03 waiting on condition [0x0000700006b21000]
>    java.lang.Thread.State: TIMED_WAITING (parking)
>     at sun.misc.Unsafe.park(Native Method)
>     - parking to wait for  <0x00000007f4069868> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>     at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:226)
>     at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2176)
>     at net.schmizz.concurrent.Promise.tryRetrieve(Promise.java:170)
>     at net.schmizz.concurrent.Promise.retrieve(Promise.java:137)
>     at net.schmizz.concurrent.Event.await(Event.java:103)
>     at net.schmizz.sshj.connection.channel.AbstractChannel.join(AbstractChannel.java:259)
>     at org.apache.brooklyn.util.core.internal.ssh.sshj.SshjTool$ShellAction.create(SshjTool.java:1009)
>     at org.apache.brooklyn.util.core.internal.ssh.sshj.SshjTool$ShellAction.create(SshjTool.java:926)
>     at org.apache.brooklyn.util.core.internal.ssh.sshj.SshjTool.acquire(SshjTool.java:627)
>     at org.apache.brooklyn.util.core.internal.ssh.sshj.SshjTool.acquire(SshjTool.java:613)
>     at org.apache.brooklyn.util.core.internal.ssh.sshj.SshjTool$1.run(SshjTool.java:327)
>     at org.apache.brooklyn.util.core.internal.ssh.sshj.SshjTool.execScript(SshjTool.java:329)
>     at org.apache.brooklyn.util.core.task.system.internal.ExecWithLoggingHelpers$1.exec(ExecWithLoggingHelpers.java:83)
>     at org.apache.brooklyn.util.core.task.system.internal.ExecWithLoggingHelpers$3.apply(ExecWithLoggingHelpers.java:168)
>     at org.apache.brooklyn.util.core.task.system.internal.ExecWithLoggingHelpers$3.apply(ExecWithLoggingHelpers.java:165)
>     at org.apache.brooklyn.util.pool.BasicPool.exec(BasicPool.java:146)
>     at org.apache.brooklyn.location.ssh.SshMachineLocation.execSsh(SshMachineLocation.java:601)
>     at org.apache.brooklyn.location.ssh.SshMachineLocation$13.execWithTool(SshMachineLocation.java:780)
>     at org.apache.brooklyn.util.core.task.system.internal.ExecWithLoggingHelpers.execWithLogging(ExecWithLoggingHelpers.java:165)
>     at org.apache.brooklyn.util.core.task.system.internal.ExecWithLoggingHelpers.execScript(ExecWithLoggingHelpers.java:81)
>     at org.apache.brooklyn.location.ssh.SshMachineLocation.execScript(SshMachineLocation.java:764)
>     at org.apache.brooklyn.location.ssh.SshMachineLocation.execScript(SshMachineLocation.java:758)
>     at org.apache.brooklyn.location.ssh.SshMachineLocationIntegrationTest.testSlowForVisualInspection(SshMachineLocationIntegrationTest.java:98)
> "sftp reader" prio=5 tid=0x00007f8ba43a6800 nid=0x6503 in Object.wait() [0x0000700008571000]
>    java.lang.Thread.State: WAITING (on object monitor)
>     at java.lang.Object.wait(Native Method)
>     - waiting on <0x00000007f3f15690> (a net.schmizz.sshj.common.Buffer$PlainBuffer)
>     at java.lang.Object.wait(Object.java:503)
>     at net.schmizz.sshj.connection.channel.ChannelInputStream.read(ChannelInputStream.java:107)
>     - locked <0x00000007f3f15690> (a net.schmizz.sshj.common.Buffer$PlainBuffer)
>     at net.schmizz.sshj.sftp.PacketReader.readIntoBuffer(PacketReader.java:51)
>     at net.schmizz.sshj.sftp.PacketReader.getPacketLength(PacketReader.java:59)
>     at net.schmizz.sshj.sftp.PacketReader.readPacket(PacketReader.java:75)
>     at net.schmizz.sshj.sftp.PacketReader.run(PacketReader.java:87)
> "Thread-7" prio=5 tid=0x00007f8ba1a9f800 nid=0x6903 in Object.wait() [0x0000700008777000]
>    java.lang.Thread.State: WAITING (on object monitor)
>     at java.lang.Object.wait(Native Method)
>     - waiting on <0x00000007f4069948> (a net.schmizz.sshj.common.Buffer$PlainBuffer)
>     at java.lang.Object.wait(Object.java:503)
>     at net.schmizz.sshj.connection.channel.ChannelInputStream.read(ChannelInputStream.java:107)
>     - locked <0x00000007f4069948> (a net.schmizz.sshj.common.Buffer$PlainBuffer)
>     at net.schmizz.sshj.connection.channel.ChannelInputStream.read(ChannelInputStream.java:90)
>     - locked <0x00000007f4069930> (a [B)
>     at org.apache.brooklyn.util.stream.StreamGobbler.run(StreamGobbler.java:81)
> "Thread-6" prio=5 tid=0x00007f8ba1b2b000 nid=0x6703 in Object.wait() [0x0000700008674000]
>    java.lang.Thread.State: WAITING (on object monitor)
>     at java.lang.Object.wait(Native Method)
>     - waiting on <0x00000007f4061698> (a net.schmizz.sshj.common.Buffer$PlainBuffer)
>     at java.lang.Object.wait(Object.java:503)
>     at net.schmizz.sshj.connection.channel.ChannelInputStream.read(ChannelInputStream.java:107)
>     - locked <0x00000007f4061698> (a net.schmizz.sshj.common.Buffer$PlainBuffer)
>     at net.schmizz.sshj.connection.channel.ChannelInputStream.read(ChannelInputStream.java:90)
>     - locked <0x00000007f4061680> (a [B)
>     at org.apache.brooklyn.util.stream.StreamGobbler.run(StreamGobbler.java:81)
> "reader" prio=5 tid=0x00007f8ba385c000 nid=0x6303 runnable [0x000070000846e000]
>    java.lang.Thread.State: RUNNABLE
>     at java.net.SocketInputStream.socketRead0(Native Method)
>     at java.net.SocketInputStream.read(SocketInputStream.java:152)
>     at java.net.SocketInputStream.read(SocketInputStream.java:122)
>     at net.schmizz.sshj.transport.Reader.run(Reader.java:50)
> "brooklyn-execmanager-SrVbReTz-3" daemon prio=5 tid=0x00007f8ba21eb000 nid=0x6103 waiting
on condition [0x000070000836b000]
>    java.lang.Thread.State: TIMED_WAITING (parking)
>     at sun.misc.Unsafe.park(Native Method)
>     - parking to wait for  <0x00000007fc0088e0> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>     at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:226)
>     at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2082)
>     at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1090)
>     at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:807)
>     at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1068)
>     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)
>     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>     at java.lang.Thread.run(Thread.java:745)
> "Thread-2" prio=5 tid=0x00007f8ba21ea000 nid=0x5f03 in Object.wait() [0x0000700008268000]
>    java.lang.Thread.State: TIMED_WAITING (on object monitor)
>     at java.lang.Object.wait(Native Method)
>     - waiting on <0x00000007fc018088> (a java.io.PipedInputStream)
>     at java.io.PipedInputStream.read(PipedInputStream.java:327)
>     - locked <0x00000007fc018088> (a java.io.PipedInputStream)
>     at org.apache.brooklyn.util.stream.StreamGobbler.run(StreamGobbler.java:81)
> "Thread-1" prio=5 tid=0x00007f8ba20e7800 nid=0x5d03 in Object.wait() [0x0000700008165000]
>    java.lang.Thread.State: TIMED_WAITING (on object monitor)
>     at java.lang.Object.wait(Native Method)
>     - waiting on <0x00000007fc020088> (a java.io.PipedInputStream)
>     at java.io.PipedInputStream.read(PipedInputStream.java:327)
>     - locked <0x00000007fc020088> (a java.io.PipedInputStream)
>     at org.apache.brooklyn.util.stream.StreamGobbler.run(StreamGobbler.java:81)
> {noformat}
> ([~svet] I spoke to you about this previously, and believe you have some comments?)



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Mime
View raw message