pig-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Apache Wiki <wikidi...@apache.org>
Subject [Pig Wiki] Update of "PigMemory" by AlanGates
Date Fri, 15 May 2009 00:30:38 GMT
Dear Wiki user,

You have subscribed to a wiki page or wiki category on "Pig Wiki" for change notification.

The following page has been changed by AlanGates:

New page:
= Pig Memory Usage Improvement =

== Problem Statement ==

 1. Pig hogs memory.  In the 0.2.0 version, the expansion factor of data on disk to data in
memory is 2-3x.  This causes pig problems in terms of efficiently executing users programs.
 It is caused largely by extensive use Java objects (Integer, etc.) to store internal data.
 1. Java memory management and its garbage collector are poorly suited to the workload of
intensive data processing.  Pig needs better control over where data is stored and when memory
is deallocated.  For a complete discussion of this issue see M. A. Shah et. al., ''Java Support
for Data-Intensive Systems:  Experiences Building the Telegraph Dataflow System''.

== Proposed Solution ==
Switching from using Java containers and objects to using large memory buffers and a page
cache will address both of these issues.

=== Architecture ===
A basic page caching mechanism will be built.  Traditional OS page caches are fairly small
(often a few K bytes).  This will use large
pages (default size around 10M or so).  This is chosen for a couple of reasons:

 1. It is convenient to constrain all of the scalar values in a tuple to fit into one buffer.
 In order to do this the buffer sizes need to be large.
 1. Data in pig tends to be written all at once and read only one or two times.  Additionally,
data reads generally consist of scans through all of the output of an operator.  So there
should be relatively little need to bring pages in and out of memory repeatedly.

The page cache will consist of a !MemoryManager singleton, !TupleBuffers, and
!DataBuffers.  !TupleBuffer will contain a !DataBuffer.  It
will also contain the logic to manage spilling and reading the !DataBuffer from
disk.  The !MemoryManager will track !TupleBuffers
that are available for use and the !TupleBuffers that are full and eligible to be flushed
to disk.  It will also track the total number
of !DataBuffers in memory.  The total number of !DataBuffers that can be created will be bounded
by a configuration value.  It should
default to something like 250M (or maybe 70% of JVM memory if we can determine how much memory
the whole JVM has been

When a !TupleBuffer needs to create a new !DataBuffer (either to store new data or to read
in flushed data from
disk) then the !TupleBuffer will request one from the !MemoryManager.  If 
the maximum number of !DataBuffers are already in existence,
then the !MemoryManager will select, via an LRU algorithm, a !TupleBuffer that is full and
in memory and request that it flush its data
to disk.  Once it has done that, it will take the now available !DataBuffer and
return it to the originally requesting !TupleBuffer.

In addition to this page caching system, new implementations of the Tuple and
Bag interfaces will be written.  !ManagedTuple will store
scalar objects and maps in !TupleBuffers as serialized data.  It will obtain
!TupleBuffers from the !MemoryManager, thus allowing many
tuples to share one !TupleBuffer.  This should significantly reduce the memory footprint.
 One tuple will be
stored entirely within a given !TupleBuffer.  As data is written to a !TupleBuffer, some amount
of space will be saved in the buffer to
allow tuples in that buffer to expand.  (Question, do we really need this?  How often do tuples
grow?  The case I can think of for this
is when a value in a tuple is cast from one type to another.)  If a tuple
cannot fit entirely in a given !TupleBuffer it will request a
new !TupleBuffer to store its data in.  The !ManagedTuple will store an array
of offsets into the !TupleBuffer's !DataBuffer corresponding
to each scalar and map field.  References to tuples and bags will be stored in a separate
array of Objects.  There is no requirement
that the tuples and bags inside a tuple use the same !TupleBuffer as the outer tuple.

!ManagedTuple will handle translating between serialized data in !TupleBuffers and Java objects
used in the Tuple interface.  Hopefully
with boxing for numeric types this will be relatively fast.  Strings and maps will be somewhat
slower.  Changes will also be necessary
to !DataByteArray to allow it to reference underlying data with an offset and a length, so
that it need not copy bytes in and out of the
!TupleBuffer.  A new implementation of Map will also be necessary.  This
!ManagedMap will lay out its keys and values in a !TupleBuffer
and supports reading of values from that serialized data.  In the future we will investigate
changing the Tuple interface to include non-object get methods for numeric types 
so that the tuple can return the data without the need of going through an object conversion.
 This interface could be used by
arithmetic operators which operate on numbers anyway.

In order to facilitate this translation to objects, !ManagedTuple will store one byte with
each field that records the type of the
field, and whether or not the field is null.  In the future we can investigate
an optimized version of !ManagedTuple that takes a schema
and thus avoids the need to store type info for every tuple.  In this case null information
could be stored efficiently in a bit

=== Detailed Design ===

     * This is just a wrapper so I can store a byte[] in a container.  Entire class is
     * package level as no one outside of the data package should
     * be interacting with it.
    class DataBuffer {

        static int bufferSize;

        static {
            String bufsize = System.getProperties().getProperty("pig.memory.databuffer.size",
            bufferSize = Integer.valueOf(bufsize) * 1024;

        byte[bufferSize] data;

     * A buffer to manage large byte arrays, count tuples referencing them,
     * and manage spilling them to disk.  Entire class is
     * package level as no one outside of the data package should
     * be interacting with it.
    class TupleBuffer {

        // number of tuples with references in the buffer
        private int refCnt;
        private int nextOffset;
        // Package level so others can see it without the overhead of a read call.
        DataBuffer data;
        File diskCache;

        TupleBuffer() {

        void recycle() {
            diskCache = null;
            nextOffset = 0;
            refCnt = 0;
         * Called by a tuple when it is going to store it's data in this TupleBuffer.
        void attach() {

         * Called by a tuple when it is deallocated.
        void detach() {
            if (--refCnt < 1) {

         * Write data to the buffer.
         * @param data bytes to write to the buffer
         * @return offset where data written starts
        int write(byte[] data) {
            write data into this.data;
            move nextOffset;
            if insufficient space return -1
            else return value of nextOffset before write

         * Bring the buffer into memory.  This must be called
         * before the tuple begins to read the data.
        void bringIntoMemory() {
            if data on disk {
                read into memory
                diskCache = null;
                // pushes the buffer back onto the full queue, so it can be
                // flushed again if necessary.

         * Determine if there is space in the buffer for a new tuple.
         * @param size estimated size (in bytes) of the new tuple.
         * @return true if there's room
        boolean isSpaceForNew(int size) {
            assert in memory, otherwise no one should be trying add
            // configurable % should probably default to around 80% so there's room for growth.
            // We need to play with this and see what's optimal.
            boolean isspace = (bufferSize - nextOffset + size) / bufferSize) < some configurable
            if (!isspace) MemoryManager.getMemoryManager().markFull(this);
            return isspace;

         * Determine if there is space in the buffer to expand
         * an existing tuple.
         * @param size bytes to write to the buffer.  This size
         * should be accurate, or at least a guaranteed upper bound.
         * @return true if there's room.
        boolean isSpaceForGrowth(int size) {
            return nextOffset + size < bufferSize;

         * Flush to data to disk. 
         * @return 
        DataBuffer flush() {
            diskCache = new File;
            write data to diskCache;
            return data;


     * A class to manage all of the memory and data buffers.  Entire class is
     * package level as no one outside of the data package should
     * be interacting with it.
     * This class tracks TupleBuffers and DataBuffers.  We can create as many
     * TupleBuffers as necessary.  But the number of DataBuffers is bounded
     * by the available memory provided by the system.
    class MemoryManager {

        private static MemoryManager self = null;

        private Set<TupleBuffer> availableTupleBuffers;
        private List<TupleBuffer> fullTupleBuffers;
        private List<DataBuffer> availableDataBuffers;
        private int numCreatedDataBuffers;
        private int maxDataBuffers;

        static getMemoryManager() {
            if (self == null) {
                self = new MemoryManager();
            return MemoryManager.self;

        MemoryManager() {
            Determine available memory based on configuration.
            set maxDataBuffers

         * Mark a TupleBuffer as no longer available to store new tuples.
         * It may still have room to grow tuples currently stored in it.
         * The TupleBuffer will also be put on the full list so that it
         * is eligible for flushing to disk if necessary.
         * @param tb TupleBuffer to take off the available list.
        void markFull(TupleBuffer tb) {

         * Mark a TupleBuffer as available to take new tuples.  All 
         * existing data in this tuple buffer will be dropped.
        void recycle(TupleBuffer tb) {

         * Get a TupleBuffer.  If there's one to use on the available list
         * then use it, otherwise create a new one.
        TupleBuffer getTupleBuffer() {
            if (availableTupleBuffers.size() > 0) {
                return availableTupleBuffers.iterator().getNext();
            } else {
                TupleBuffer tb = new TupleBuffer();
                return tb;

         * Get a DataBuffer.  If there's one to use on the available list
         * then return it.  Otherwise, if we have not yet reached the maximum
         * number of data buffers, create a new one.  As a last resort, tell
         * an existing full TupleBuffer to flush and then use its DataBuffer.
        DataBuffer getDataBuffer() {
            if (availableDataBuffers.size() > 0) {
                return availableDataBuffers.pop();
            } else {
                if (numCreatedDataBuffers < maxDataBuffers) {
                    return new DataBuffer();
                } else {
                    TupleBuffer victim = fullTupleBuffers.pop_front();
                    return availableDataBuffers.pop();


     * An implementation of Tuple that works with managed memory.  It stores
     * complex types (tuples, bags, maps) as objects in an internal array.
     * Scalar types are laid out in a TupleBuffer as raw data.  The format
     * of the layout is 1 byte for type, 4 bytes for length (byte array and
     * char array types only), then the data.
    class ManagedTuple implements Tuple {

        // Reference to the TupleBuffer holding data for this tuple.
        TupleBuffer buf;

        // Offsets into the tuple buffer for each field.  A negative value
        // indicates that the field is a complex type, and the absoluate value
        // yields the offsets into the complexFields array.  This is stored
        // as an array to avoid the overhead of an ArrayList.
        private int[] fieldOffsets;

        // Array of complex fields (tuples, bags, maps) in this tuple.  Stored
        // as an aray to avoid the overhead of an array list.
        private Object[] complexFields;

        // Do not add any more member variables.  We want to keep the memory
        // footprint of ManagedTuple to an absolute minimum.

        ManagedTuple() {
            buf = MemoryManager.getMemoryManager().getTupleBuffer();

        public protected finalize() {

        public void append(Object val) {
            if (val is complex) {
                grow fieldOffsets by 1;
                grow complexFields by 1;
                append val to complexFields;
                append negative offset to fieldOffsets;
            } else {
                if (buf.sizeToGrow(val.sizeof())) {
                    grow fieldOffsetsBy 1;
                    fieldOffsets[last] = buf.write(val.toBytes());
                } else {
                    move tuple to new TupleBuffer, adding new last field

        public Object get(int fieldNum) {
            if (fieldOffsets[fieldNum] < 0) {
                return complexFields[fieldOffsets[fieldNum] * -1];
            } else {
                determine type from buf.data.data[fieldOffsets[fieldNum]];
                instantiate correct type of object;
                return it;

        public void set(int fieldNum, Object val) {
            if (val is map) {
                m = new ManagedMap();
                // may actually need to grow complex fields if this field
                // didn't use to be complex or if it was null.
                complexFields[fieldOffsets[fieldNum] * -1] = m;
            } else if (val is tuple or bag) {
                // may actually need to grow complex fields if this field
                // didn't use to be complex or if it was null.
                complexFields[fieldOffsets[fieldNum] * -1] = val;
            } else {
                if (buf.sizeToGrow(val.sizeof())) {
                    fieldOffsets[fieldNum] = buf.write(val.toBytes());
                } else {
                    move tuple to new TupleBuffer;

     * An implementation of Map for use with managed memory.  Package level
     * access because it will only be used by ManagedTuple.
    class ManagedMap implements Map<String, Object> {

        // Reference to the TupleBuffer holding data for this tuple.
        private TupleBuffer buf;

        void put(String key, Object val) {
            // throw, we can' amend an existing tuple.

        void putAll(Map<String, Object> m) {
            write keys and values into buf, using length to delineate;
            value objects should be serialized the same as in tuple with a type;

        Object get(String key) {
            Look through the data, skipping key to key until we find the right key;
            Deserialize teh value out of the map into appropriate object type and return.


A new class !ManagedBag will be created that does not extend !DefaultAbstractBag.  This bag
will not support spilling.  It will
aggressively minimize the use of member variables to keep its memory footprint to a minimum.

== Proposed Methodology ==

As they say in the financial world, this document contains many statements that are forward
looking and may or may not work out.  We
should prototype this along the way to assure ourselves that this will in fact bring the proposed
improvement while maintaining

Step one should be to prototype !ManagedTuple and !ManagedMap.  Then a stand alone tool can
be written that will create tuples until it
runs out of memory.  It can then be run with existing !DefaultTuples and with
!ManagedTuples and see if significant improvements are
seen.  Improvement should be in the range of cutting the memory footprint in half.  This test
should be run with all scalar data and
with data that includes maps and bags of maps (since this is a common use case for our users).

Assuming step one shows promising results, the next step should be to prototype the page caching
system.  Queries that we know to
produce GC overhead type errors in existing code should be run with the page caching system
to show that it properly handles the
situation with no GC overhead.

== Thoughts for Future Work ==
In the above referenced Telegraph paper the developers took the next step and managed not
just the large memory buffers but even the
creation and deletion of objects.  This meant a couple of things:

 1. They needed to add explicit deallocation to their programming.  
 1. They needed to keep pools of available objects for recycling.

We may want to look at these options, though 1 in particular may be difficult as Java programmers
(especially those who haven't
programmed in C++ or a similar language) have no concept of deallocating objects when they
are finished with them.  However, they noted
that by totally circumventing the Java garbage collector they got around a 2.5x speedup of
their system.  So it might be worth

View raw message