curator-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Sznajder ForMailingList <bs4mailingl...@gmail.com>
Subject Re: Multiple consumers on a single server - strange behavior.
Date Tue, 19 Nov 2013 23:39:22 GMT
Hi Jordan

Following your advice, I moved the processing of the event outside of the
listener.

However, I do not catch any event ....

What did I write wrong?

Thanks a lot!

package com..hrl.zk;

import java.io.Closeable;
import java.io.IOException;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Date;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.framework.api.CuratorListener;
import org.apache.curator.framework.recipes.queue.DistributedQueue;
import org.apache.curator.test.TestingServer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com...crawler.CrawlUrl;
import com...crawler.Utils;
import com...main.CrawlerPropertyFile;

public class QueueProducer implements Closeable {

    final static Logger LOG = LoggerFactory.getLogger(QueueProducer.class);

    final static DateFormat dateFormat = new SimpleDateFormat("HH:mm:ss");

    DistributedQueue<CrawlUrl> queue;

    private static final int QUEUE_SIZE = 100;

    int items;

    int cnt;

    private class QueueAdder extends Thread {

        private int size;
        public QueueAdder(int size) {
            System.out.println("QueueAdder " + size);
            this.size = size;
        }

        @Override
        public void run() {
            LOG.info("QueueAdder ! " );
            for (int i = 0; i < this.size; i++ ) {
                try {
                    QueueProducer.this.queue.put(new
CrawlUrl(""+QueueProducer.this.items++));
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
            LOG.info("DONE!");
        }
    }

    public QueueProducer(CuratorFramework framework) throws Exception {
        LOG.info(java.net.InetAddress.getLocalHost().getHostName() + " is a
QueueProducer");

System.out.println(java.net.InetAddress.getLocalHost().getHostName() + " is
a QueueProducer");
        this.queue = Utils.newDistributedQueue(framework,
                Utils.CRAWL_QUEUE_PATH, Utils.CRAWL_QUEUE_LOCK_PATH, null);
        this.queue.start();
        addQueueContent(QUEUE_SIZE);
        System.out.println("Done with the initial init");


        // We register to the listener for monitoring the number of elements
        // in the queue
        framework.getCuratorListenable().addListener(new CuratorListener() {
            @Override
            public void eventReceived(final CuratorFramework framework_,
                    CuratorEvent event) throws Exception {
                LOG.info("CNT " + QueueProducer.this.cnt);
                if (QueueProducer.this.cnt++ <= QUEUE_SIZE /2) {
                    QueueAdder queueAdder = new QueueAdder(QUEUE_SIZE -
QueueProducer.this.cnt );
                    queueAdder.start();
                }

            }
        });

    }



    void addQueueContent(int numberOfItems) {
        LOG.info(dateFormat.format(new Date()) + " - addQueueContent " +
numberOfItems);
        for (int i = 0; i < numberOfItems; i++) {
            try {
                CrawlUrl url = new CrawlUrl(""+this.items++);
                this.queue.put(url);
            } catch (Exception e) {
                LOG.error ("Caught an error when adding the item " + i + "
in the initQueueContent()");
            }
        }
    }



    @Override
    public void close() throws IOException {
        this.queue.close();
    }



}



On Mon, Nov 18, 2013 at 12:59 AM, Jordan Zimmerman <
jordan@jordanzimmerman.com> wrote:

> I don’t think there’s any reason to assume that each consumer will process
> an equal number of messages.
>
> -JZ
>
> On Nov 17, 2013, at 2:51 PM, Sznajder ForMailingList <
> bs4mailinglist@gmail.com> wrote:
>
> Hi Jordan..
>
> Regarding the output:
>
> As you can see the LOG prints the name of the consumer and "processed "
> and the item....
>
> I am running the program with 4 servers in my chorum:
> ir-hadoop1 server is a producer
> ir-hadoop2--> ir-hadoop4 are consumers.
>
> After 14 minutes, I simply count the number of procssed items on each one
> of the consumer (a simplistic grep on the LOG file) and I get the folliwng:
>
> ir-hadoop2 : 3042 processed items
> ir-hadoop3 : 1276 processed items
> ir-hadoop4 : 830 processed items...
>
> If I have a look at the procssed times, I can see that ir-hadoop4 , is
> most of the time idle... I attach here the LOG corresponding to ir-hadoop4
> for example
>
> Benjamin
>
>
> On Mon, Nov 18, 2013 at 12:03 AM, Jordan Zimmerman <
> jordan@jordanzimmerman.com> wrote:
>
>> The example out is missing. Please provide that too.
>>
>> -Jordan
>>
>> On Nov 17, 2013, at 1:20 PM, Sznajder ForMailingList <
>> bs4mailinglist@gmail.com> wrote:
>>
>> First at all , thank you for your answer.
>>
>> Here is the simple code, I used:
>>
>> The producer and queueconsummer are given in the class
>>
>> Every 5 minutes, I am printing the the number of processed items, and I
>> see some drastic differences between the different consumers:
>>
>>
>>
>> Producer:
>> =-=-=-=-=
>>
>> package com.zk;
>>
>> import java.io.Closeable;
>> import java.io.IOException;
>> import java.text.DateFormat;
>> import java.text.SimpleDateFormat;
>> import java.util.Date;
>> import java.util.List;
>>
>> import org.apache.curator.framework.CuratorFramework;
>> import org.apache.curator.framework.api.CuratorEvent;
>> import org.apache.curator.framework.api.CuratorListener;
>> import org.apache.curator.framework.recipes.queue.DistributedQueue;
>> import org.apache.curator.test.TestingServer;
>> import org.slf4j.Logger;
>> import org.slf4j.LoggerFactory;
>>
>>
>> public class QueueProducer implements Closeable {
>>
>>     final static Logger LOG =
>> LoggerFactory.getLogger(QueueProducer.class);
>>
>>     final static DateFormat dateFormat = new SimpleDateFormat("HH:mm:ss");
>>
>>     protected static final String PATH = "/test_queue";
>>
>>     protected static final String LOCK_PATH = "/test_lock_queue";
>>
>>     private DistributedQueue<CrawlUrl> queue;
>>
>>     private static final int QUEUE_SIZE = 100000;
>>
>>     private int items;
>>
>>     public QueueProducer(CuratorFramework framework) throws Exception {
>>         LOG.info(java.net.InetAddress.getLocalHost().getHostName() + " is
>> a QueueProducer");
>>
>> System.out.println(java.net.InetAddress.getLocalHost().getHostName() + " is
>> a QueueProducer");
>>         this.queue = Utils.newDistributedQueue(framework,
>>                 Utils.CRAWL_QUEUE_PATH, Utils.CRAWL_QUEUE_LOCK_PATH,
>> null);
>>         this.queue.start();
>>         addQueueContent(QUEUE_SIZE);
>>         System.out.println("Done with the initial init");
>>
>>
>>         // We register to the listener for monitoring the number of
>> elements
>>         // in the queue
>>         framework.getCuratorListenable().addListener(new
>> CuratorListener() {
>>             @Override
>>             public void eventReceived(final CuratorFramework framework_,
>>                     CuratorEvent event) throws Exception {
>>                 if (event.getPath() != null    &&
>> event.getPath().equals(Utils.CRAWL_QUEUE_PATH)) {
>>                     // this also restores the notification
>>                     List<String> children = framework_.getChildren()
>>                             .watched().forPath(Utils.CRAWL_QUEUE_PATH);
>>                     if (children.size() <= QUEUE_SIZE/2) {
>>                         addQueueContent(QUEUE_SIZE - children.size());
>>                     }
>>                 }
>>             }
>>         });
>>
>>
>>         while (true) {
>>             List<String> children =
>> framework.getChildren().watched().forPath(Utils.CRAWL_QUEUE_PATH);
>>             if (children.size() <= QUEUE_SIZE/2) {
>>                 LOG.info(dateFormat.format(new Date()) + " - In the
>> while(true) - We call for size " + children.size());
>>                 addQueueContent(QUEUE_SIZE - children.size());
>>             }
>>
>>             Thread.sleep(5000);
>>
>>         }
>>     }
>>
>>     void addQueueContent(int numberOfItems) {
>>         LOG.info(dateFormat.format(new Date()) + " - addQueueContent " +
>> numberOfItems);
>>         for (int i = 0; i < numberOfItems; i++) {
>>             try {
>>                 CrawlUrl url = new CrawlUrl(""+this.items++);
>>                 this.queue.put(url);
>>             } catch (Exception e) {
>>                 LOG.error ("Caught an error when adding the item " + i +
>> " in the initQueueContent()");
>>             }
>>         }
>>     }
>>
>>     public static void main(String[] args) {
>>         CrawlerPropertyFile props;
>>         try {
>>             props = new CrawlerPropertyFile(args[0]);
>>
>>             final String connectString;
>>             System.out.println("DEBUG = " + Utils.DEBUG);
>>             if (props.useZkTestServer()) {
>>                 System.out.println("Will launch from zkTestServer");
>>                 TestingServer server = new TestingServer();
>>                 connectString = server.getConnectString();
>>             } else {
>>                 connectString = props.getZkServer();
>>             }
>>
>>             final CuratorFramework framework =
>> Utils.newFramework(connectString);
>>             framework.start();
>>
>>             @SuppressWarnings("unused")
>>             QueueProducer producer = new QueueProducer(framework);
>>         } catch (Exception e) {
>>             e.printStackTrace();
>>         }
>>
>>     }
>>
>>     @Override
>>     public void close() throws IOException {
>>         this.queue.close();
>>     }
>>
>>
>>
>> }
>>
>>
>>
>>
>> Consumer
>> =-=-=-=-=-
>>
>> package com.zk;
>>
>> import java.io.Closeable;
>> import java.io.File;
>> import java.io.FileWriter;
>> import java.io.IOException;
>> import java.text.DateFormat;
>> import java.text.SimpleDateFormat;
>> import java.util.Date;
>>
>> import org.apache.curator.framework.CuratorFramework;
>> import org.apache.curator.framework.recipes.queue.DistributedQueue;
>> import org.apache.curator.framework.recipes.queue.QueueConsumer;
>> import org.apache.curator.framework.state.ConnectionState;
>> import org.apache.curator.test.TestingServer;
>> import org.slf4j.Logger;
>> import org.slf4j.LoggerFactory;
>>
>>
>>
>> public class MyQueueConsumer implements Closeable{
>>
>>     private DistributedQueue<CrawlUrl> queue;
>>
>>     String name;
>>
>>     String id;
>>
>>     FileWriter timeCounter;
>>
>>     final static Logger LOG =
>> LoggerFactory.getLogger(MyQueueConsumer.class);
>>
>>     final static DateFormat dateFormat = new SimpleDateFormat("HH:mm:ss");
>>
>>     int numberOfProcessedURL;
>>
>>     private com.zk.MyQueueConsumer.FileWriterThread timeCounterThread;
>>
>>     private class FileWriterThread extends Thread {
>>
>>         public FileWriterThread() {
>>             // empty ctor
>>         }
>>
>>         @Override
>>         public void run() {
>>             // We write the stats:
>>
>>             try {
>>                 while (true) {
>>
>> MyQueueConsumer.this.timeCounter.write(dateFormat.format(new Date()) + " "+
>>
>> "[numberOfProcessed="+MyQueueConsumer.this.numberOfProcessedURL    +"]\n") ;
>>                     MyQueueConsumer.this.timeCounter.flush();
>>
>>                     // Sleeps 5 minutes
>>                     Thread.sleep(300000);
>>                 }
>>             } catch (Exception e) {
>>                 // TODO Auto-generated catch block
>>                 e.printStackTrace();
>>             }
>>         }
>>     }
>>
>>
>>     public MyQueueConsumer(CuratorFramework framework, final String id)
>> throws Exception {
>>         this.id = id;
>>         this.name = java.net.InetAddress.getLocalHost().getHostName();
>>         this.timeCounter = new FileWriter(new File("MyQueueConsumer_"+
>> this.name + "_" +id + "_timeCounter.txt"));
>>
>> //        this.timeCounterThread = new FileWriterThread();
>> //        this.timeCounterThread.start();
>>         this.queue = Utils.newDistributedQueue(framework,
>> Utils.CRAWL_QUEUE_PATH, Utils.CRAWL_QUEUE_LOCK_PATH, new
>> QueueConsumer<CrawlUrl>() {
>>
>>             @Override
>>             public void stateChanged(CuratorFramework client,
>> ConnectionState newState) {
>>                 System.out.println(String.format("[%s] connection state
>> changed to %s", id, newState));
>>             }
>>
>>             @Override
>>             public void consumeMessage(CrawlUrl url) throws Exception {
>>                 try {
>>                     LOG.info(dateFormat.format(new
>> Date(System.currentTimeMillis())) + "["+id+ "-" +
>> MyQueueConsumer.this.name <http://myqueueconsumer.this.name/>+ "]
>> processed " + url.url);
>>                     MyQueueConsumer.this.numberOfProcessedURL++;
>>                 } catch (Exception e) {
>>                     LOG.error( "["+id+ "-" + MyQueueConsumer.this.name<http://myqueueconsumer.this.name/>+
>> "]" + e.getMessage() + " for url " + url.url );
>>                 }
>>             }
>>
>>         });
>>         try {
>>             this.queue.start();
>>         } catch (Exception e) {
>>             e.printStackTrace();
>>         }
>>
>>     }
>>
>>     public static void main(String[] args) {
>>         try {
>>             CrawlerPropertyFile props = new CrawlerPropertyFile(args[0]);
>>
>>             final String connectString;
>>             System.out.println("DEBUG = " + Utils.DEBUG);
>>             if (props.useZkTestServer()) {
>>                 System.out.println("Will launch from zkTestServer");
>>                 TestingServer server = new TestingServer();
>>                 connectString = server.getConnectString();
>>             } else {
>>                 connectString = props.getZkServer();
>>             }
>>
>>             final CuratorFramework framework =
>> Utils.newFramework(connectString);
>>             framework.start();
>>
>>             final MyQueueConsumer[] queueConsumers = new
>> MyQueueConsumer[props.getNumberOfWorkers()];
>>
>>             for (int i = 0; i < queueConsumers.length; i++) {
>>                 queueConsumers[i] = new MyQueueConsumer(framework,
>> "id_"+i);
>>             }
>>
>>             Runtime.getRuntime().addShutdownHook(new Thread() {
>>                 @Override
>>                 public void run() {
>>                     // close workers
>>                     Throwable t = null;
>>                     LOG.info("We close the workers");
>>                     for (MyQueueConsumer queueConsumer : queueConsumers) {
>>                         try {
>>                             queueConsumer.close();
>>                         } catch (Throwable th) {
>>                             if (t == null) {
>>                                 t = th;
>>                             }
>>                         }
>>                     }
>>                     // throw first exception that we encountered
>>                     if (t != null) {
>>                         throw new RuntimeException("some workers failed
>> to close", t);
>>                     }
>>                 }
>>             });
>>
>>         }catch (Exception e ){
>>             e.printStackTrace();
>>         }
>>     }
>>
>>     @Override
>>     public void close() throws IOException {
>>         this.queue.close();
>>     }
>> }
>>
>>
>>
>>
>> Main
>> -=-=-
>>
>> package com.zk;
>>
>> import org.apache.curator.framework.CuratorFramework;
>> import org.apache.curator.test.TestingServer;
>>
>>
>>
>> public class QueueTestMain {
>>
>>     /**
>>      * @param args
>>      */
>>     public static void main(String[] args) {
>>         CrawlerPropertyFile props;
>>         try {
>>             props = new CrawlerPropertyFile(args[0]);
>>
>>             final String connectString;
>>             System.out.println("DEBUG = " + Utils.DEBUG);
>>             if (props.useZkTestServer()) {
>>                 System.out.println("Will launch from zkTestServer");
>>                 TestingServer server = new TestingServer();
>>                 connectString = server.getConnectString();
>>             } else {
>>                 connectString = props.getZkServer();
>>             }
>>
>>             final CuratorFramework framework =
>> Utils.newFramework(connectString);
>>             framework.start();
>>
>>
>>             if (args[1] != null && args[1].equalsIgnoreCase("true")) {
>>                 @SuppressWarnings("unused")
>>                 QueueProducer producer = new QueueProducer(framework);
>>             } else {
>>
>>                 final MyQueueConsumer[] queueConsumers = new
>> MyQueueConsumer[props.getNumberOfWorkers()];
>>
>>                 for (int i = 0; i < queueConsumers.length; i++) {
>>                     queueConsumers[i] = new MyQueueConsumer(framework,
>> "id_"+i);
>>                 }
>>
>>                 Runtime.getRuntime().addShutdownHook(new Thread() {
>>                     @Override
>>                     public void run() {
>>                         // close workers
>>                         Throwable t = null;
>>                         for (MyQueueConsumer queueConsumer :
>> queueConsumers) {
>>                             try {
>>                                 queueConsumer.close();
>>                             } catch (Throwable th) {
>>                                 if (t == null) {
>>                                     t = th;
>>                                 }
>>                             }
>>                         }
>>                         // throw first exception that we encountered
>>                         if (t != null) {
>>                             throw new RuntimeException("some workers
>> failed to close", t);
>>                         }
>>                     }
>>                 });
>>
>>
>>             }
>>         }catch (Exception e ){
>>             e.printStackTrace();
>>         }
>>
>>     }
>>
>> }
>>
>>
>>
>>
>> Example of output:
>>
>>
>>
>>
>>
>> On Sun, Nov 17, 2013 at 10:14 PM, Jordan Zimmerman <
>> jordan@jordanzimmerman.com> wrote:
>>
>>> Can you produce a test that shows this? Anything else interesting in the
>>> log? Of course, there could be a bug.
>>>
>>> -Jordan
>>>
>>> On Nov 14, 2013, at 1:18 PM, Sznajder ForMailingList <
>>> bs4mailinglist@gmail.com> wrote:
>>>
>>> > Hi
>>> >
>>> > I made a short test as following:
>>> >
>>> > - I have a chorum of 3 nodes for Zookeeper.
>>> > - I wrote a class using Curator QueueProducer who produces all the
>>> time (when the queue is 10% full, it creates new items) , items (random
>>> integer)
>>> > - I wrote a simple class using Curator Queue Consumer which simply
>>> prints to Log "consumed item i".
>>> >
>>> > I tested some different combinations :
>>> > - running the consumers on one, two or three nodes.
>>> > - running one or more consumers in parallel on a given node.
>>> >
>>> >
>>> > But, and here is my question: I see some very strange behavior when I
>>> have several consummers in parallel on a node. For example, running 5
>>> consumers per node on 3 nodes, I see a throughput **very** slow. When
>>> looking at my Log, I see that most of the consumers are most of the time on
>>> an idle state....
>>> >
>>> > Do I mistake somewhere?
>>> >  I was expecting to enhance the throughput by augmenting the number of
>>> consumers, I am surprised to see the opposite....
>>> >
>>> > Thanks a lot
>>> >
>>> > Benjamin
>>>
>>>
>>
>>
> <ir-hadoop4_log.txt>
>
>
>

Mime
View raw message