curator-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jordan Zimmerman <jor...@jordanzimmerman.com>
Subject Re: Multiple consumers on a single server - strange behavior.
Date Sun, 17 Nov 2013 21:33:14 GMT
I’ll look further at this, but the first thing that I notice is that you are doing “work”
in your Curator Listener. Please read Curator Tech Note 1:

	https://cwiki.apache.org/confluence/display/CURATOR/TN1

The quickest fix would be to do the getChildren() as a background operation. Alternatively,
you can pass in a thread pool when registering the listener.

-Jordan

On Nov 17, 2013, at 1:20 PM, Sznajder ForMailingList <bs4mailinglist@gmail.com> wrote:

> First at all , thank you for your answer.
> 
> Here is the simple code, I used:
> 
> The producer and queueconsummer are given in the class 
> 
> Every 5 minutes, I am printing the the number of processed items, and I see some drastic
differences between the different consumers:
> 
> 
> 
> Producer:
> =-=-=-=-=
> 
> package com.zk;
> 
> import java.io.Closeable;
> import java.io.IOException;
> import java.text.DateFormat;
> import java.text.SimpleDateFormat;
> import java.util.Date;
> import java.util.List;
> 
> import org.apache.curator.framework.CuratorFramework;
> import org.apache.curator.framework.api.CuratorEvent;
> import org.apache.curator.framework.api.CuratorListener;
> import org.apache.curator.framework.recipes.queue.DistributedQueue;
> import org.apache.curator.test.TestingServer;
> import org.slf4j.Logger;
> import org.slf4j.LoggerFactory;
> 
> 
> public class QueueProducer implements Closeable {
>     
>     final static Logger LOG = LoggerFactory.getLogger(QueueProducer.class);
>     
>     final static DateFormat dateFormat = new SimpleDateFormat("HH:mm:ss");
>     
>     protected static final String PATH = "/test_queue";
> 
>     protected static final String LOCK_PATH = "/test_lock_queue";
>     
>     private DistributedQueue<CrawlUrl> queue;
>     
>     private static final int QUEUE_SIZE = 100000;
>     
>     private int items;
> 
>     public QueueProducer(CuratorFramework framework) throws Exception {
>         LOG.info(java.net.InetAddress.getLocalHost().getHostName() + " is a QueueProducer");
>         System.out.println(java.net.InetAddress.getLocalHost().getHostName() + " is a
QueueProducer");
>         this.queue = Utils.newDistributedQueue(framework,
>                 Utils.CRAWL_QUEUE_PATH, Utils.CRAWL_QUEUE_LOCK_PATH, null);
>         this.queue.start();
>         addQueueContent(QUEUE_SIZE);
>         System.out.println("Done with the initial init");
> 
> 
>         // We register to the listener for monitoring the number of elements
>         // in the queue
>         framework.getCuratorListenable().addListener(new CuratorListener() {
>             @Override
>             public void eventReceived(final CuratorFramework framework_,
>                     CuratorEvent event) throws Exception {
>                 if (event.getPath() != null    && event.getPath().equals(Utils.CRAWL_QUEUE_PATH))
{
>                     // this also restores the notification
>                     List<String> children = framework_.getChildren()
>                             .watched().forPath(Utils.CRAWL_QUEUE_PATH);
>                     if (children.size() <= QUEUE_SIZE/2) {
>                         addQueueContent(QUEUE_SIZE - children.size());
>                     }
>                 }
>             }
>         });
> 
> 
>         while (true) {
>             List<String> children = framework.getChildren().watched().forPath(Utils.CRAWL_QUEUE_PATH);
>             if (children.size() <= QUEUE_SIZE/2) {
>                 LOG.info(dateFormat.format(new Date()) + " - In the while(true) - We
call for size " + children.size());
>                 addQueueContent(QUEUE_SIZE - children.size());
>             }            
>                 
>             Thread.sleep(5000);
> 
>         }
>     }
> 
>     void addQueueContent(int numberOfItems) {
>         LOG.info(dateFormat.format(new Date()) + " - addQueueContent " + numberOfItems);
>         for (int i = 0; i < numberOfItems; i++) {
>             try {
>                 CrawlUrl url = new CrawlUrl(""+this.items++);
>                 this.queue.put(url);
>             } catch (Exception e) {
>                 LOG.error ("Caught an error when adding the item " + i + " in the initQueueContent()");
>             }
>         }
>     }
>     
>     public static void main(String[] args) {
>         CrawlerPropertyFile props;
>         try {
>             props = new CrawlerPropertyFile(args[0]);
> 
>             final String connectString;
>             System.out.println("DEBUG = " + Utils.DEBUG);
>             if (props.useZkTestServer()) {
>                 System.out.println("Will launch from zkTestServer");
>                 TestingServer server = new TestingServer();
>                 connectString = server.getConnectString();
>             } else {
>                 connectString = props.getZkServer();
>             }
> 
>             final CuratorFramework framework = Utils.newFramework(connectString);
>             framework.start();
> 
>             @SuppressWarnings("unused")
>             QueueProducer producer = new QueueProducer(framework);
>         } catch (Exception e) {
>             e.printStackTrace();
>         }
> 
>     }
> 
>     @Override
>     public void close() throws IOException {
>         this.queue.close();
>     }
>     
>     
> 
> }
> 
> 
> 
> 
> Consumer 
> =-=-=-=-=-
> 
> package com.zk;
> 
> import java.io.Closeable;
> import java.io.File;
> import java.io.FileWriter;
> import java.io.IOException;
> import java.text.DateFormat;
> import java.text.SimpleDateFormat;
> import java.util.Date;
> 
> import org.apache.curator.framework.CuratorFramework;
> import org.apache.curator.framework.recipes.queue.DistributedQueue;
> import org.apache.curator.framework.recipes.queue.QueueConsumer;
> import org.apache.curator.framework.state.ConnectionState;
> import org.apache.curator.test.TestingServer;
> import org.slf4j.Logger;
> import org.slf4j.LoggerFactory;
> 
> 
> 
> public class MyQueueConsumer implements Closeable{
>     
>     private DistributedQueue<CrawlUrl> queue;
> 
>     String name;
> 
>     String id;
> 
>     FileWriter timeCounter;
>     
>     final static Logger LOG = LoggerFactory.getLogger(MyQueueConsumer.class);
>     
>     final static DateFormat dateFormat = new SimpleDateFormat("HH:mm:ss");
>     
>     int numberOfProcessedURL;
> 
>     private com.zk.MyQueueConsumer.FileWriterThread timeCounterThread;
>     
>     private class FileWriterThread extends Thread {
> 
>         public FileWriterThread() {
>             // empty ctor
>         }
> 
>         @Override
>         public void run() {
>             // We write the stats:
> 
>             try {
>                 while (true) {
>                     MyQueueConsumer.this.timeCounter.write(dateFormat.format(new Date())
+ " "+
>                             "[numberOfProcessed="+MyQueueConsumer.this.numberOfProcessedURL
   +"]\n") ;
>                     MyQueueConsumer.this.timeCounter.flush();
>                 
>                     // Sleeps 5 minutes
>                     Thread.sleep(300000);
>                 }
>             } catch (Exception e) {
>                 // TODO Auto-generated catch block
>                 e.printStackTrace();
>             }
>         }
>     }
>     
>     
>     public MyQueueConsumer(CuratorFramework framework, final String id) throws Exception
{
>         this.id = id;
>         this.name = java.net.InetAddress.getLocalHost().getHostName();
>         this.timeCounter = new FileWriter(new File("MyQueueConsumer_"+ this.name + "_"
+id + "_timeCounter.txt"));
>         
> //        this.timeCounterThread = new FileWriterThread();
> //        this.timeCounterThread.start();
>         this.queue = Utils.newDistributedQueue(framework, Utils.CRAWL_QUEUE_PATH, Utils.CRAWL_QUEUE_LOCK_PATH,
new QueueConsumer<CrawlUrl>() {
> 
>             @Override
>             public void stateChanged(CuratorFramework client, ConnectionState newState)
{
>                 System.out.println(String.format("[%s] connection state changed to %s",
id, newState));
>             }
> 
>             @Override
>             public void consumeMessage(CrawlUrl url) throws Exception {
>                 try {
>                     LOG.info(dateFormat.format(new Date(System.currentTimeMillis()))
+ "["+id+ "-" + MyQueueConsumer.this.name+ "] processed " + url.url);
>                     MyQueueConsumer.this.numberOfProcessedURL++;
>                 } catch (Exception e) {
>                     LOG.error( "["+id+ "-" + MyQueueConsumer.this.name+ "]" + e.getMessage()
+ " for url " + url.url );
>                 } 
>             }
> 
>         });
>         try {
>             this.queue.start();
>         } catch (Exception e) {
>             e.printStackTrace();
>         }
> 
>     }
>     
>     public static void main(String[] args) {
>         try {
>             CrawlerPropertyFile props = new CrawlerPropertyFile(args[0]);
>     
>             final String connectString;
>             System.out.println("DEBUG = " + Utils.DEBUG);
>             if (props.useZkTestServer()) {
>                 System.out.println("Will launch from zkTestServer");
>                 TestingServer server = new TestingServer();
>                 connectString = server.getConnectString();
>             } else {
>                 connectString = props.getZkServer();
>             }
>     
>             final CuratorFramework framework = Utils.newFramework(connectString);
>             framework.start();
>     
>             final MyQueueConsumer[] queueConsumers = new MyQueueConsumer[props.getNumberOfWorkers()];
>     
>             for (int i = 0; i < queueConsumers.length; i++) {
>                 queueConsumers[i] = new MyQueueConsumer(framework, "id_"+i);
>             }
>     
>             Runtime.getRuntime().addShutdownHook(new Thread() {
>                 @Override
>                 public void run() {
>                     // close workers
>                     Throwable t = null;
>                     LOG.info("We close the workers");
>                     for (MyQueueConsumer queueConsumer : queueConsumers) {
>                         try {
>                             queueConsumer.close();
>                         } catch (Throwable th) {
>                             if (t == null) {
>                                 t = th;
>                             }
>                         }
>                     }
>                     // throw first exception that we encountered
>                     if (t != null) {
>                         throw new RuntimeException("some workers failed to close", t);
>                     }
>                 }
>             });
>             
>         }catch (Exception e ){
>             e.printStackTrace();
>         }
>     }
> 
>     @Override
>     public void close() throws IOException {
>         this.queue.close();
>     }
> }
> 
> 
> 
> 
> Main
> -=-=-
> 
> package com.zk;
> 
> import org.apache.curator.framework.CuratorFramework;
> import org.apache.curator.test.TestingServer;
> 
> 
> 
> public class QueueTestMain {
> 
>     /**
>      * @param args
>      */
>     public static void main(String[] args) {
>         CrawlerPropertyFile props;
>         try {
>             props = new CrawlerPropertyFile(args[0]);
> 
>             final String connectString;
>             System.out.println("DEBUG = " + Utils.DEBUG);
>             if (props.useZkTestServer()) {
>                 System.out.println("Will launch from zkTestServer");
>                 TestingServer server = new TestingServer();
>                 connectString = server.getConnectString();
>             } else {
>                 connectString = props.getZkServer();
>             }
> 
>             final CuratorFramework framework = Utils.newFramework(connectString);
>             framework.start();
>             
> 
>             if (args[1] != null && args[1].equalsIgnoreCase("true")) {
>                 @SuppressWarnings("unused")
>                 QueueProducer producer = new QueueProducer(framework);
>             } else {
>             
>                 final MyQueueConsumer[] queueConsumers = new MyQueueConsumer[props.getNumberOfWorkers()];
>                 
>                 for (int i = 0; i < queueConsumers.length; i++) {
>                     queueConsumers[i] = new MyQueueConsumer(framework, "id_"+i);
>                 }
>         
>                 Runtime.getRuntime().addShutdownHook(new Thread() {
>                     @Override
>                     public void run() {
>                         // close workers
>                         Throwable t = null;
>                         for (MyQueueConsumer queueConsumer : queueConsumers) {
>                             try {
>                                 queueConsumer.close();
>                             } catch (Throwable th) {
>                                 if (t == null) {
>                                     t = th;
>                                 }
>                             }
>                         }
>                         // throw first exception that we encountered
>                         if (t != null) {
>                             throw new RuntimeException("some workers failed to close",
t);
>                         }
>                     }
>                 });
>     
>                 
>             }
>         }catch (Exception e ){
>             e.printStackTrace();
>         }
> 
>     }
> 
> }
> 
> 
> 
> 
> Example of output:
> 
> 
> 
> 
> 
> On Sun, Nov 17, 2013 at 10:14 PM, Jordan Zimmerman <jordan@jordanzimmerman.com>
wrote:
> Can you produce a test that shows this? Anything else interesting in the log? Of course,
there could be a bug.
> 
> -Jordan
> 
> On Nov 14, 2013, at 1:18 PM, Sznajder ForMailingList <bs4mailinglist@gmail.com>
wrote:
> 
> > Hi
> >
> > I made a short test as following:
> >
> > - I have a chorum of 3 nodes for Zookeeper.
> > - I wrote a class using Curator QueueProducer who produces all the time (when the
queue is 10% full, it creates new items) , items (random integer)
> > - I wrote a simple class using Curator Queue Consumer which simply prints to Log
"consumed item i".
> >
> > I tested some different combinations :
> > - running the consumers on one, two or three nodes.
> > - running one or more consumers in parallel on a given node.
> >
> >
> > But, and here is my question: I see some very strange behavior when I have several
consummers in parallel on a node. For example, running 5 consumers per node on 3 nodes, I
see a throughput **very** slow. When looking at my Log, I see that most of the consumers are
most of the time on an idle state....
> >
> > Do I mistake somewhere?
> >  I was expecting to enhance the throughput by augmenting the number of consumers,
I am surprised to see the opposite....
> >
> > Thanks a lot
> >
> > Benjamin
> 
> 


Mime
View raw message