curator-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Sznajder ForMailingList <bs4mailingl...@gmail.com>
Subject Re: Multiple consumers on a single server - strange behavior.
Date Sun, 17 Nov 2013 21:20:04 GMT
First at all , thank you for your answer.

Here is the simple code, I used:

The producer and queueconsummer are given in the class

Every 5 minutes, I am printing the the number of processed items, and I see
some drastic differences between the different consumers:



Producer:
=-=-=-=-=

package com.zk;

import java.io.Closeable;
import java.io.IOException;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.List;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.framework.api.CuratorListener;
import org.apache.curator.framework.recipes.queue.DistributedQueue;
import org.apache.curator.test.TestingServer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


public class QueueProducer implements Closeable {

    final static Logger LOG = LoggerFactory.getLogger(QueueProducer.class);

    final static DateFormat dateFormat = new SimpleDateFormat("HH:mm:ss");

    protected static final String PATH = "/test_queue";

    protected static final String LOCK_PATH = "/test_lock_queue";

    private DistributedQueue<CrawlUrl> queue;

    private static final int QUEUE_SIZE = 100000;

    private int items;

    public QueueProducer(CuratorFramework framework) throws Exception {
        LOG.info(java.net.InetAddress.getLocalHost().getHostName() + " is a
QueueProducer");

System.out.println(java.net.InetAddress.getLocalHost().getHostName() + " is
a QueueProducer");
        this.queue = Utils.newDistributedQueue(framework,
                Utils.CRAWL_QUEUE_PATH, Utils.CRAWL_QUEUE_LOCK_PATH, null);
        this.queue.start();
        addQueueContent(QUEUE_SIZE);
        System.out.println("Done with the initial init");


        // We register to the listener for monitoring the number of elements
        // in the queue
        framework.getCuratorListenable().addListener(new CuratorListener() {
            @Override
            public void eventReceived(final CuratorFramework framework_,
                    CuratorEvent event) throws Exception {
                if (event.getPath() != null    &&
event.getPath().equals(Utils.CRAWL_QUEUE_PATH)) {
                    // this also restores the notification
                    List<String> children = framework_.getChildren()
                            .watched().forPath(Utils.CRAWL_QUEUE_PATH);
                    if (children.size() <= QUEUE_SIZE/2) {
                        addQueueContent(QUEUE_SIZE - children.size());
                    }
                }
            }
        });


        while (true) {
            List<String> children =
framework.getChildren().watched().forPath(Utils.CRAWL_QUEUE_PATH);
            if (children.size() <= QUEUE_SIZE/2) {
                LOG.info(dateFormat.format(new Date()) + " - In the
while(true) - We call for size " + children.size());
                addQueueContent(QUEUE_SIZE - children.size());
            }

            Thread.sleep(5000);

        }
    }

    void addQueueContent(int numberOfItems) {
        LOG.info(dateFormat.format(new Date()) + " - addQueueContent " +
numberOfItems);
        for (int i = 0; i < numberOfItems; i++) {
            try {
                CrawlUrl url = new CrawlUrl(""+this.items++);
                this.queue.put(url);
            } catch (Exception e) {
                LOG.error ("Caught an error when adding the item " + i + "
in the initQueueContent()");
            }
        }
    }

    public static void main(String[] args) {
        CrawlerPropertyFile props;
        try {
            props = new CrawlerPropertyFile(args[0]);

            final String connectString;
            System.out.println("DEBUG = " + Utils.DEBUG);
            if (props.useZkTestServer()) {
                System.out.println("Will launch from zkTestServer");
                TestingServer server = new TestingServer();
                connectString = server.getConnectString();
            } else {
                connectString = props.getZkServer();
            }

            final CuratorFramework framework =
Utils.newFramework(connectString);
            framework.start();

            @SuppressWarnings("unused")
            QueueProducer producer = new QueueProducer(framework);
        } catch (Exception e) {
            e.printStackTrace();
        }

    }

    @Override
    public void close() throws IOException {
        this.queue.close();
    }



}




Consumer
=-=-=-=-=-

package com.zk;

import java.io.Closeable;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Date;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.queue.DistributedQueue;
import org.apache.curator.framework.recipes.queue.QueueConsumer;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.test.TestingServer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;



public class MyQueueConsumer implements Closeable{

    private DistributedQueue<CrawlUrl> queue;

    String name;

    String id;

    FileWriter timeCounter;

    final static Logger LOG =
LoggerFactory.getLogger(MyQueueConsumer.class);

    final static DateFormat dateFormat = new SimpleDateFormat("HH:mm:ss");

    int numberOfProcessedURL;

    private com.zk.MyQueueConsumer.FileWriterThread timeCounterThread;

    private class FileWriterThread extends Thread {

        public FileWriterThread() {
            // empty ctor
        }

        @Override
        public void run() {
            // We write the stats:

            try {
                while (true) {

MyQueueConsumer.this.timeCounter.write(dateFormat.format(new Date()) + " "+

"[numberOfProcessed="+MyQueueConsumer.this.numberOfProcessedURL    +"]\n") ;
                    MyQueueConsumer.this.timeCounter.flush();

                    // Sleeps 5 minutes
                    Thread.sleep(300000);
                }
            } catch (Exception e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
    }


    public MyQueueConsumer(CuratorFramework framework, final String id)
throws Exception {
        this.id = id;
        this.name = java.net.InetAddress.getLocalHost().getHostName();
        this.timeCounter = new FileWriter(new File("MyQueueConsumer_"+
this.name + "_" +id + "_timeCounter.txt"));

//        this.timeCounterThread = new FileWriterThread();
//        this.timeCounterThread.start();
        this.queue = Utils.newDistributedQueue(framework,
Utils.CRAWL_QUEUE_PATH, Utils.CRAWL_QUEUE_LOCK_PATH, new
QueueConsumer<CrawlUrl>() {

            @Override
            public void stateChanged(CuratorFramework client,
ConnectionState newState) {
                System.out.println(String.format("[%s] connection state
changed to %s", id, newState));
            }

            @Override
            public void consumeMessage(CrawlUrl url) throws Exception {
                try {
                    LOG.info(dateFormat.format(new
Date(System.currentTimeMillis())) + "["+id+ "-" + MyQueueConsumer.this.name+
"] processed " + url.url);
                    MyQueueConsumer.this.numberOfProcessedURL++;
                } catch (Exception e) {
                    LOG.error( "["+id+ "-" + MyQueueConsumer.this.name+ "]"
+ e.getMessage() + " for url " + url.url );
                }
            }

        });
        try {
            this.queue.start();
        } catch (Exception e) {
            e.printStackTrace();
        }

    }

    public static void main(String[] args) {
        try {
            CrawlerPropertyFile props = new CrawlerPropertyFile(args[0]);

            final String connectString;
            System.out.println("DEBUG = " + Utils.DEBUG);
            if (props.useZkTestServer()) {
                System.out.println("Will launch from zkTestServer");
                TestingServer server = new TestingServer();
                connectString = server.getConnectString();
            } else {
                connectString = props.getZkServer();
            }

            final CuratorFramework framework =
Utils.newFramework(connectString);
            framework.start();

            final MyQueueConsumer[] queueConsumers = new
MyQueueConsumer[props.getNumberOfWorkers()];

            for (int i = 0; i < queueConsumers.length; i++) {
                queueConsumers[i] = new MyQueueConsumer(framework, "id_"+i);
            }

            Runtime.getRuntime().addShutdownHook(new Thread() {
                @Override
                public void run() {
                    // close workers
                    Throwable t = null;
                    LOG.info("We close the workers");
                    for (MyQueueConsumer queueConsumer : queueConsumers) {
                        try {
                            queueConsumer.close();
                        } catch (Throwable th) {
                            if (t == null) {
                                t = th;
                            }
                        }
                    }
                    // throw first exception that we encountered
                    if (t != null) {
                        throw new RuntimeException("some workers failed to
close", t);
                    }
                }
            });

        }catch (Exception e ){
            e.printStackTrace();
        }
    }

    @Override
    public void close() throws IOException {
        this.queue.close();
    }
}




Main
-=-=-

package com.zk;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.test.TestingServer;



public class QueueTestMain {

    /**
     * @param args
     */
    public static void main(String[] args) {
        CrawlerPropertyFile props;
        try {
            props = new CrawlerPropertyFile(args[0]);

            final String connectString;
            System.out.println("DEBUG = " + Utils.DEBUG);
            if (props.useZkTestServer()) {
                System.out.println("Will launch from zkTestServer");
                TestingServer server = new TestingServer();
                connectString = server.getConnectString();
            } else {
                connectString = props.getZkServer();
            }

            final CuratorFramework framework =
Utils.newFramework(connectString);
            framework.start();


            if (args[1] != null && args[1].equalsIgnoreCase("true")) {
                @SuppressWarnings("unused")
                QueueProducer producer = new QueueProducer(framework);
            } else {

                final MyQueueConsumer[] queueConsumers = new
MyQueueConsumer[props.getNumberOfWorkers()];

                for (int i = 0; i < queueConsumers.length; i++) {
                    queueConsumers[i] = new MyQueueConsumer(framework,
"id_"+i);
                }

                Runtime.getRuntime().addShutdownHook(new Thread() {
                    @Override
                    public void run() {
                        // close workers
                        Throwable t = null;
                        for (MyQueueConsumer queueConsumer :
queueConsumers) {
                            try {
                                queueConsumer.close();
                            } catch (Throwable th) {
                                if (t == null) {
                                    t = th;
                                }
                            }
                        }
                        // throw first exception that we encountered
                        if (t != null) {
                            throw new RuntimeException("some workers failed
to close", t);
                        }
                    }
                });


            }
        }catch (Exception e ){
            e.printStackTrace();
        }

    }

}




Example of output:





On Sun, Nov 17, 2013 at 10:14 PM, Jordan Zimmerman <
jordan@jordanzimmerman.com> wrote:

> Can you produce a test that shows this? Anything else interesting in the
> log? Of course, there could be a bug.
>
> -Jordan
>
> On Nov 14, 2013, at 1:18 PM, Sznajder ForMailingList <
> bs4mailinglist@gmail.com> wrote:
>
> > Hi
> >
> > I made a short test as following:
> >
> > - I have a chorum of 3 nodes for Zookeeper.
> > - I wrote a class using Curator QueueProducer who produces all the time
> (when the queue is 10% full, it creates new items) , items (random integer)
> > - I wrote a simple class using Curator Queue Consumer which simply
> prints to Log "consumed item i".
> >
> > I tested some different combinations :
> > - running the consumers on one, two or three nodes.
> > - running one or more consumers in parallel on a given node.
> >
> >
> > But, and here is my question: I see some very strange behavior when I
> have several consummers in parallel on a node. For example, running 5
> consumers per node on 3 nodes, I see a throughput **very** slow. When
> looking at my Log, I see that most of the consumers are most of the time on
> an idle state....
> >
> > Do I mistake somewhere?
> >  I was expecting to enhance the throughput by augmenting the number of
> consumers, I am surprised to see the opposite....
> >
> > Thanks a lot
> >
> > Benjamin
>
>

Mime
View raw message