spark-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Steve Loughran <ste...@hortonworks.com>
Subject Re: Adding HDFS read-time metrics per task (RE: SPARK-1683)
Date Thu, 12 May 2016 19:12:50 GMT

On 12 May 2016, at 04:44, Brian Cho <chobrian@gmail.com<mailto:chobrian@gmail.com>>
wrote:

Hi Kay,

Thank you for the detailed explanation.

If I understand correctly, I *could* time each record processing time by measuring the time
in reader.next, but this would add overhead for every single record. And this is the method
that was abandoned because of performance regressions.

The other possibility is changing HDFS first. This method looks promising even if it takes
some time. I'll play around with it a bit for now. Thanks again!

-Brian

On Wed, May 11, 2016 at 4:45 PM, Kay Ousterhout <keo@eecs.berkeley.edu<mailto:keo@eecs.berkeley.edu>>
wrote:
Hi Brian,

Unfortunately it's not possible to do this in Spark for two reasons.  First, we read records
from Spark one at a time (e.g., if you're reading a HDFS file and performing some map function,
one record will be read from HDFS, then the map function will be applied, then the next record
will be read, etc.). The relevant code is here<https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala#L209>:
we create an iterator that's then passed on to other downstream RDDs.  As a result, we'd need
to time each record's processing, which adds too much overhead.

The other potential issue is that we use the RecordReader interface, which means that we get
deserialized and decompressed records, so any time we measured would include time to read
the data from disk and decompress/deserialize it (not sure if you're trying to isolate the
disk time).

Measuring decompression overhead alone is interesting. Indeed, with encryption at rest and
erasure coding in hadoop, you'd think about isolating work there too, to see where the bottlenecks
move to after a switch to SSDs.


It *is* possible to do this instrumentation for disk read time in HDFS, because HDFS reads
larger blocks from disk (and then passes them to Spark one by one), and I did that (in a hacky
way) in the most recent commits in this Hadoop branch<https://github.com/kayousterhout/hadoop-common/commits/2.0.2-instrumented>.
 I filed a Hadoop JIRA <https://issues.apache.org/jira/browse/HADOOP-11873> to add this
(in a less hacky way, using FileSystem.Statistics) but haven't submitted a patch for it. 
If there's sufficient interest, I could properly implement the metrics and see if it could
be merged into Hadoop, at which point Spark could start reading those metrics (unfortunately,
the delay for this would be pretty significant because we'd need to wait for a new Hadoop
version and then a new Spark version, and it would only be available in newer versions of
Hadoop).

The metrics API changed 19 hours ago into something more sophisticated, though it doesn't
measure timings.

https://issues.apache.org/jira/browse/HADOOP-13065

it's designed to be more extensible; you'll ask for a metric by name, not compile-time field...this
will let different filesystems add different values

A few minutes ago, https://issues.apache.org/jira/browse/HADOOP-13028 went in to do some metric
work for spark, and there the stats can be printed in logs, because the filesystem and inputStream
toString() operators return the metrics. That's for people: not machines; the text may break
without warning. But you can at least dump the metrics in your logs to see what's going on.
That stuff can be seen in downstream tests, but not directly published as metrics. The aggregate
stats are also collected as metrics2 stats, which should somehow be convertible to Coda Hale
metrics, and hence with the rest of Spark's monitoring.


A more straightforward action might just be for spark itself to subclass FilterFileSystem
and implement operation timing there, both for operations and any input/output streams returned
in create & open.


Mime
View raw message