Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 309AA200CAC for ; Mon, 19 Jun 2017 16:52:07 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 2F905160BE4; Mon, 19 Jun 2017 14:52:07 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 26DB9160BD5 for ; Mon, 19 Jun 2017 16:52:06 +0200 (CEST) Received: (qmail 18664 invoked by uid 500); 19 Jun 2017 14:52:05 -0000 Mailing-List: contact dev-help@brooklyn.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@brooklyn.apache.org Delivered-To: mailing list dev@brooklyn.apache.org Received: (qmail 18653 invoked by uid 99); 19 Jun 2017 14:52:05 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 19 Jun 2017 14:52:05 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id EC61A1AA980 for ; Mon, 19 Jun 2017 14:52:04 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -99.202 X-Spam-Level: X-Spam-Status: No, score=-99.202 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, RP_MATCHES_RCVD=-0.001, SPF_PASS=-0.001, USER_IN_WHITELIST=-100] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id GC8AjGIpfhPw for ; Mon, 19 Jun 2017 14:52:02 +0000 (UTC) Received: from mailrelay1-us-west.apache.org (mailrelay1-us-west.apache.org [209.188.14.139]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with ESMTP id 58BE55F2EE for ; Mon, 19 Jun 2017 14:52:01 +0000 (UTC) Received: from jira-lw-us.apache.org (unknown [207.244.88.139]) by mailrelay1-us-west.apache.org (ASF Mail Server at mailrelay1-us-west.apache.org) with ESMTP id 8E7FFE0D48 for ; Mon, 19 Jun 2017 14:52:00 +0000 (UTC) Received: from jira-lw-us.apache.org (localhost [127.0.0.1]) by jira-lw-us.apache.org (ASF Mail Server at jira-lw-us.apache.org) with ESMTP id 2D53323FFF for ; Mon, 19 Jun 2017 14:52:00 +0000 (UTC) Date: Mon, 19 Jun 2017 14:52:00 +0000 (UTC) From: "ASF GitHub Bot (JIRA)" To: dev@brooklyn.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Commented] (BROOKLYN-440) More efficient thread usage for ssh execution MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 7bit X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 archived-at: Mon, 19 Jun 2017 14:52:07 -0000 [ https://issues.apache.org/jira/browse/BROOKLYN-440?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16054137#comment-16054137 ] ASF GitHub Bot commented on BROOKLYN-440: ----------------------------------------- Github user neykov commented on a diff in the pull request: https://github.com/apache/brooklyn-server/pull/731#discussion_r122729269 --- Diff: utils/common/src/main/java/org/apache/brooklyn/util/stream/LoggingOutputStream.java --- @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.util.stream; + +import java.io.FilterOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.slf4j.Logger; + +/** + * Wraps another output stream, intercepting the writes to log it. + */ +public class LoggingOutputStream extends FilterOutputStream { + + private static final OutputStream NOOP_OUTPUT_STREAM = new FilterOutputStream(null) { + @Override public void write(int b) throws IOException { + } + @Override public void flush() throws IOException { + } + @Override public void close() throws IOException { + } + }; + + public static Builder builder() { + return new Builder(); + } + + public static class Builder { + OutputStream out; + Logger log; + String logPrefix; + + public Builder outputStream(OutputStream val) { + this.out = val; + return this; + } + public Builder logger(Logger val) { + this.log = val; + return this; + } + public Builder logPrefix(String val) { + this.logPrefix = val; + return this; + } + public LoggingOutputStream build() { + return new LoggingOutputStream(this); + } + } + + protected final Logger log; + protected final String logPrefix; + private final AtomicBoolean running = new AtomicBoolean(true); + private final StringBuilder lineSoFar = new StringBuilder(16); + + private LoggingOutputStream(Builder builder) { + super(builder.out != null ? builder.out : NOOP_OUTPUT_STREAM); + log = builder.log; + logPrefix = (builder.logPrefix != null) ? builder.logPrefix : ""; + } + + @Override + public void write(int b) throws IOException { + if (running.get() && b >= 0) onChar(b); + out.write(b); + } + + @Override + public void flush() throws IOException { + try { + if (lineSoFar.length() > 0) { + onLine(lineSoFar.toString()); + lineSoFar.setLength(0); --- End diff -- > Is your concern about leaking that the lineSoFar might now have a very large internal array in StringBuilder.value, which won't be GC'ed until the LoggingOutputStream is GC'ed? Yes, that's what I meant. > Maybe I'll compromise and say that if the size was >= 1024 then I'll trimToSize. Sounds like a good balance. Alternatively could re-create the `StringBuilder` object with size 16 if you feel that's a better metric. > More efficient thread usage for ssh execution > --------------------------------------------- > > Key: BROOKLYN-440 > URL: https://issues.apache.org/jira/browse/BROOKLYN-440 > Project: Brooklyn > Issue Type: Improvement > Affects Versions: 0.10.0 > Reporter: Aled Sage > Priority: Minor > > For consuming the stdout/stderr from ssh execution, our use of {{PipedInputStream}}/{{PipedOutputStream}} and {{StreamGobbler}} looks very inefficient (my fault - I wrote it originally!) > We normally consume 6 threads per ssh execution: > 1. The calling thread of {{SshMachineLocation.execScript}} is blocker, waiting for it to complete. > 2. Within sshj, there is a "sftp reader" thread that reads the packets > 3. Within {{SshjTool.ExecAction}}, we create a {{StreamGobbler}} thread to read the ssh stdout, as it is made available. > 4. Same for stderr. > 5. Within {{ExecWithLoggingHelpers.execWithLogging}}, we create a {{StreamGobber}} thread to read + log the ssh stdout, as it is made available. > 6. Same for stderr. > I'm pretty sure we can get rid of threads 5 and 6; not sure about 3 and 4 though. > Here is the chain of actions: > * We call something like {{sshMachineLocation.execScript}}. This can include "out" and "err" config, to obtain the exec stdout/stderr. > * {{SshMachineLocation}} wraps the command execution in {{ExecWithLoggingHelpers.execWithLogging}}. > * In order to log the stdout (and same for stderr): > * It creates a {{PipedOutputStream}} and {{PipedInputStream}}. It sets the {{PipedOutputStream}} as the stdout to use (i.e. config passed to {{SshjTool}}) > * It creates a {{StreamGobbler}}, which is a thread that consumes the {{PipedInputStream}} - this logs each line, and also writes each line to the original "out". > * Within {{SshjTool.ExecAction}}, it has an sshj {{Session.Command.getInputStream()}} and {{.getErrorStream()}} for reading the stdout and stderr. > It creates a {{StreamGobbler}}, which is a thread to consume these input streams; it writes the bytes received from that input stream to the {{out}} and {{err}} streams passed in. > For the logging, a simpler and more efficient approach would be to wrap the OutputStream. See {{com.google.common.io.CountingOutputStream}} for inspiration. It should be as simple as extending {{java.util.FilterOutputStream}}, and overriding the {{write}} and {{close}} methods. The implementation of these methods would call the wrapped outputStream as well as doing some thing very similar to {{StreamGobbler.onChar()}}. > I don't see a way to do the same trick inside {{SshjTool.ExecAction}}, unfortunately, without changes to sshj to wrap {{ChannelInputStream}} which is created inside {{ net.schmizz.sshj.connection.channel.AbstractChannel}} 's constructor (or to override {{AbstractChannel.receiveInto}} perhaps). None of those seem worth it. > Below is a trimmed down jstack from executing {{sleep 100}} over ssh: > {noformat} > 2017-02-13 11:09:35 > Full thread dump Java HotSpot(TM) 64-Bit Server VM (24.80-b11 mixed mode): > "main" prio=5 tid=0x00007f8ba180a000 nid=0x1c03 waiting on condition [0x0000700006b21000] > java.lang.Thread.State: TIMED_WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0x00000007f4069868> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) > at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:226) > at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2176) > at net.schmizz.concurrent.Promise.tryRetrieve(Promise.java:170) > at net.schmizz.concurrent.Promise.retrieve(Promise.java:137) > at net.schmizz.concurrent.Event.await(Event.java:103) > at net.schmizz.sshj.connection.channel.AbstractChannel.join(AbstractChannel.java:259) > at org.apache.brooklyn.util.core.internal.ssh.sshj.SshjTool$ShellAction.create(SshjTool.java:1009) > at org.apache.brooklyn.util.core.internal.ssh.sshj.SshjTool$ShellAction.create(SshjTool.java:926) > at org.apache.brooklyn.util.core.internal.ssh.sshj.SshjTool.acquire(SshjTool.java:627) > at org.apache.brooklyn.util.core.internal.ssh.sshj.SshjTool.acquire(SshjTool.java:613) > at org.apache.brooklyn.util.core.internal.ssh.sshj.SshjTool$1.run(SshjTool.java:327) > at org.apache.brooklyn.util.core.internal.ssh.sshj.SshjTool.execScript(SshjTool.java:329) > at org.apache.brooklyn.util.core.task.system.internal.ExecWithLoggingHelpers$1.exec(ExecWithLoggingHelpers.java:83) > at org.apache.brooklyn.util.core.task.system.internal.ExecWithLoggingHelpers$3.apply(ExecWithLoggingHelpers.java:168) > at org.apache.brooklyn.util.core.task.system.internal.ExecWithLoggingHelpers$3.apply(ExecWithLoggingHelpers.java:165) > at org.apache.brooklyn.util.pool.BasicPool.exec(BasicPool.java:146) > at org.apache.brooklyn.location.ssh.SshMachineLocation.execSsh(SshMachineLocation.java:601) > at org.apache.brooklyn.location.ssh.SshMachineLocation$13.execWithTool(SshMachineLocation.java:780) > at org.apache.brooklyn.util.core.task.system.internal.ExecWithLoggingHelpers.execWithLogging(ExecWithLoggingHelpers.java:165) > at org.apache.brooklyn.util.core.task.system.internal.ExecWithLoggingHelpers.execScript(ExecWithLoggingHelpers.java:81) > at org.apache.brooklyn.location.ssh.SshMachineLocation.execScript(SshMachineLocation.java:764) > at org.apache.brooklyn.location.ssh.SshMachineLocation.execScript(SshMachineLocation.java:758) > at org.apache.brooklyn.location.ssh.SshMachineLocationIntegrationTest.testSlowForVisualInspection(SshMachineLocationIntegrationTest.java:98) > "sftp reader" prio=5 tid=0x00007f8ba43a6800 nid=0x6503 in Object.wait() [0x0000700008571000] > java.lang.Thread.State: WAITING (on object monitor) > at java.lang.Object.wait(Native Method) > - waiting on <0x00000007f3f15690> (a net.schmizz.sshj.common.Buffer$PlainBuffer) > at java.lang.Object.wait(Object.java:503) > at net.schmizz.sshj.connection.channel.ChannelInputStream.read(ChannelInputStream.java:107) > - locked <0x00000007f3f15690> (a net.schmizz.sshj.common.Buffer$PlainBuffer) > at net.schmizz.sshj.sftp.PacketReader.readIntoBuffer(PacketReader.java:51) > at net.schmizz.sshj.sftp.PacketReader.getPacketLength(PacketReader.java:59) > at net.schmizz.sshj.sftp.PacketReader.readPacket(PacketReader.java:75) > at net.schmizz.sshj.sftp.PacketReader.run(PacketReader.java:87) > "Thread-7" prio=5 tid=0x00007f8ba1a9f800 nid=0x6903 in Object.wait() [0x0000700008777000] > java.lang.Thread.State: WAITING (on object monitor) > at java.lang.Object.wait(Native Method) > - waiting on <0x00000007f4069948> (a net.schmizz.sshj.common.Buffer$PlainBuffer) > at java.lang.Object.wait(Object.java:503) > at net.schmizz.sshj.connection.channel.ChannelInputStream.read(ChannelInputStream.java:107) > - locked <0x00000007f4069948> (a net.schmizz.sshj.common.Buffer$PlainBuffer) > at net.schmizz.sshj.connection.channel.ChannelInputStream.read(ChannelInputStream.java:90) > - locked <0x00000007f4069930> (a [B) > at org.apache.brooklyn.util.stream.StreamGobbler.run(StreamGobbler.java:81) > "Thread-6" prio=5 tid=0x00007f8ba1b2b000 nid=0x6703 in Object.wait() [0x0000700008674000] > java.lang.Thread.State: WAITING (on object monitor) > at java.lang.Object.wait(Native Method) > - waiting on <0x00000007f4061698> (a net.schmizz.sshj.common.Buffer$PlainBuffer) > at java.lang.Object.wait(Object.java:503) > at net.schmizz.sshj.connection.channel.ChannelInputStream.read(ChannelInputStream.java:107) > - locked <0x00000007f4061698> (a net.schmizz.sshj.common.Buffer$PlainBuffer) > at net.schmizz.sshj.connection.channel.ChannelInputStream.read(ChannelInputStream.java:90) > - locked <0x00000007f4061680> (a [B) > at org.apache.brooklyn.util.stream.StreamGobbler.run(StreamGobbler.java:81) > "reader" prio=5 tid=0x00007f8ba385c000 nid=0x6303 runnable [0x000070000846e000] > java.lang.Thread.State: RUNNABLE > at java.net.SocketInputStream.socketRead0(Native Method) > at java.net.SocketInputStream.read(SocketInputStream.java:152) > at java.net.SocketInputStream.read(SocketInputStream.java:122) > at net.schmizz.sshj.transport.Reader.run(Reader.java:50) > "brooklyn-execmanager-SrVbReTz-3" daemon prio=5 tid=0x00007f8ba21eb000 nid=0x6103 waiting on condition [0x000070000836b000] > java.lang.Thread.State: TIMED_WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0x00000007fc0088e0> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) > at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:226) > at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2082) > at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1090) > at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:807) > at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1068) > at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130) > at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:745) > "Thread-2" prio=5 tid=0x00007f8ba21ea000 nid=0x5f03 in Object.wait() [0x0000700008268000] > java.lang.Thread.State: TIMED_WAITING (on object monitor) > at java.lang.Object.wait(Native Method) > - waiting on <0x00000007fc018088> (a java.io.PipedInputStream) > at java.io.PipedInputStream.read(PipedInputStream.java:327) > - locked <0x00000007fc018088> (a java.io.PipedInputStream) > at org.apache.brooklyn.util.stream.StreamGobbler.run(StreamGobbler.java:81) > "Thread-1" prio=5 tid=0x00007f8ba20e7800 nid=0x5d03 in Object.wait() [0x0000700008165000] > java.lang.Thread.State: TIMED_WAITING (on object monitor) > at java.lang.Object.wait(Native Method) > - waiting on <0x00000007fc020088> (a java.io.PipedInputStream) > at java.io.PipedInputStream.read(PipedInputStream.java:327) > - locked <0x00000007fc020088> (a java.io.PipedInputStream) > at org.apache.brooklyn.util.stream.StreamGobbler.run(StreamGobbler.java:81) > {noformat} > ([~svet] I spoke to you about this previously, and believe you have some comments?) -- This message was sent by Atlassian JIRA (v6.4.14#64029)