curator-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jordan Zimmerman <jor...@jordanzimmerman.com>
Subject Re: Multiple consumers on a single server - strange behavior.
Date Sun, 17 Nov 2013 21:56:08 GMT
Moreā€¦

The call to getChildren() just to find out how many children there are is very expensive.
Instead, call checkExists() on the parent path and use the Stat object returned. The Stat
object has a getNumChildren() method.

-Jordan

On Nov 17, 2013, at 1:20 PM, Sznajder ForMailingList <bs4mailinglist@gmail.com> wrote:

> First at all , thank you for your answer.
> 
> Here is the simple code, I used:
> 
> The producer and queueconsummer are given in the class 
> 
> Every 5 minutes, I am printing the the number of processed items, and I see some drastic
differences between the different consumers:
> 
> 
> 
> Producer:
> =-=-=-=-=
> 
> package com.zk;
> 
> import java.io.Closeable;
> import java.io.IOException;
> import java.text.DateFormat;
> import java.text.SimpleDateFormat;
> import java.util.Date;
> import java.util.List;
> 
> import org.apache.curator.framework.CuratorFramework;
> import org.apache.curator.framework.api.CuratorEvent;
> import org.apache.curator.framework.api.CuratorListener;
> import org.apache.curator.framework.recipes.queue.DistributedQueue;
> import org.apache.curator.test.TestingServer;
> import org.slf4j.Logger;
> import org.slf4j.LoggerFactory;
> 
> 
> public class QueueProducer implements Closeable {
>     
>     final static Logger LOG = LoggerFactory.getLogger(QueueProducer.class);
>     
>     final static DateFormat dateFormat = new SimpleDateFormat("HH:mm:ss");
>     
>     protected static final String PATH = "/test_queue";
> 
>     protected static final String LOCK_PATH = "/test_lock_queue";
>     
>     private DistributedQueue<CrawlUrl> queue;
>     
>     private static final int QUEUE_SIZE = 100000;
>     
>     private int items;
> 
>     public QueueProducer(CuratorFramework framework) throws Exception {
>         LOG.info(java.net.InetAddress.getLocalHost().getHostName() + " is a QueueProducer");
>         System.out.println(java.net.InetAddress.getLocalHost().getHostName() + " is a
QueueProducer");
>         this.queue = Utils.newDistributedQueue(framework,
>                 Utils.CRAWL_QUEUE_PATH, Utils.CRAWL_QUEUE_LOCK_PATH, null);
>         this.queue.start();
>         addQueueContent(QUEUE_SIZE);
>         System.out.println("Done with the initial init");
> 
> 
>         // We register to the listener for monitoring the number of elements
>         // in the queue
>         framework.getCuratorListenable().addListener(new CuratorListener() {
>             @Override
>             public void eventReceived(final CuratorFramework framework_,
>                     CuratorEvent event) throws Exception {
>                 if (event.getPath() != null    && event.getPath().equals(Utils.CRAWL_QUEUE_PATH))
{
>                     // this also restores the notification
>                     List<String> children = framework_.getChildren()
>                             .watched().forPath(Utils.CRAWL_QUEUE_PATH);
>                     if (children.size() <= QUEUE_SIZE/2) {
>                         addQueueContent(QUEUE_SIZE - children.size());
>                     }
>                 }
>             }
>         });
> 
> 
>         while (true) {
>             List<String> children = framework.getChildren().watched().forPath(Utils.CRAWL_QUEUE_PATH);
>             if (children.size() <= QUEUE_SIZE/2) {
>                 LOG.info(dateFormat.format(new Date()) + " - In the while(true) - We
call for size " + children.size());
>                 addQueueContent(QUEUE_SIZE - children.size());
>             }            
>                 
>             Thread.sleep(5000);
> 
>         }
>     }
> 
>     void addQueueContent(int numberOfItems) {
>         LOG.info(dateFormat.format(new Date()) + " - addQueueContent " + numberOfItems);
>         for (int i = 0; i < numberOfItems; i++) {
>             try {
>                 CrawlUrl url = new CrawlUrl(""+this.items++);
>                 this.queue.put(url);
>             } catch (Exception e) {
>                 LOG.error ("Caught an error when adding the item " + i + " in the initQueueContent()");
>             }
>         }
>     }
>     
>     public static void main(String[] args) {
>         CrawlerPropertyFile props;
>         try {
>             props = new CrawlerPropertyFile(args[0]);
> 
>             final String connectString;
>             System.out.println("DEBUG = " + Utils.DEBUG);
>             if (props.useZkTestServer()) {
>                 System.out.println("Will launch from zkTestServer");
>                 TestingServer server = new TestingServer();
>                 connectString = server.getConnectString();
>             } else {
>                 connectString = props.getZkServer();
>             }
> 
>             final CuratorFramework framework = Utils.newFramework(connectString);
>             framework.start();
> 
>             @SuppressWarnings("unused")
>             QueueProducer producer = new QueueProducer(framework);
>         } catch (Exception e) {
>             e.printStackTrace();
>         }
> 
>     }
> 
>     @Override
>     public void close() throws IOException {
>         this.queue.close();
>     }
>     
>     
> 
> }
> 
> 
> 
> 
> Consumer 
> =-=-=-=-=-
> 
> package com.zk;
> 
> import java.io.Closeable;
> import java.io.File;
> import java.io.FileWriter;
> import java.io.IOException;
> import java.text.DateFormat;
> import java.text.SimpleDateFormat;
> import java.util.Date;
> 
> import org.apache.curator.framework.CuratorFramework;
> import org.apache.curator.framework.recipes.queue.DistributedQueue;
> import org.apache.curator.framework.recipes.queue.QueueConsumer;
> import org.apache.curator.framework.state.ConnectionState;
> import org.apache.curator.test.TestingServer;
> import org.slf4j.Logger;
> import org.slf4j.LoggerFactory;
> 
> 
> 
> public class MyQueueConsumer implements Closeable{
>     
>     private DistributedQueue<CrawlUrl> queue;
> 
>     String name;
> 
>     String id;
> 
>     FileWriter timeCounter;
>     
>     final static Logger LOG = LoggerFactory.getLogger(MyQueueConsumer.class);
>     
>     final static DateFormat dateFormat = new SimpleDateFormat("HH:mm:ss");
>     
>     int numberOfProcessedURL;
> 
>     private com.zk.MyQueueConsumer.FileWriterThread timeCounterThread;
>     
>     private class FileWriterThread extends Thread {
> 
>         public FileWriterThread() {
>             // empty ctor
>         }
> 
>         @Override
>         public void run() {
>             // We write the stats:
> 
>             try {
>                 while (true) {
>                     MyQueueConsumer.this.timeCounter.write(dateFormat.format(new Date())
+ " "+
>                             "[numberOfProcessed="+MyQueueConsumer.this.numberOfProcessedURL
   +"]\n") ;
>                     MyQueueConsumer.this.timeCounter.flush();
>                 
>                     // Sleeps 5 minutes
>                     Thread.sleep(300000);
>                 }
>             } catch (Exception e) {
>                 // TODO Auto-generated catch block
>                 e.printStackTrace();
>             }
>         }
>     }
>     
>     
>     public MyQueueConsumer(CuratorFramework framework, final String id) throws Exception
{
>         this.id = id;
>         this.name = java.net.InetAddress.getLocalHost().getHostName();
>         this.timeCounter = new FileWriter(new File("MyQueueConsumer_"+ this.name + "_"
+id + "_timeCounter.txt"));
>         
> //        this.timeCounterThread = new FileWriterThread();
> //        this.timeCounterThread.start();
>         this.queue = Utils.newDistributedQueue(framework, Utils.CRAWL_QUEUE_PATH, Utils.CRAWL_QUEUE_LOCK_PATH,
new QueueConsumer<CrawlUrl>() {
> 
>             @Override
>             public void stateChanged(CuratorFramework client, ConnectionState newState)
{
>                 System.out.println(String.format("[%s] connection state changed to %s",
id, newState));
>             }
> 
>             @Override
>             public void consumeMessage(CrawlUrl url) throws Exception {
>                 try {
>                     LOG.info(dateFormat.format(new Date(System.currentTimeMillis()))
+ "["+id+ "-" + MyQueueConsumer.this.name+ "] processed " + url.url);
>                     MyQueueConsumer.this.numberOfProcessedURL++;
>                 } catch (Exception e) {
>                     LOG.error( "["+id+ "-" + MyQueueConsumer.this.name+ "]" + e.getMessage()
+ " for url " + url.url );
>                 } 
>             }
> 
>         });
>         try {
>             this.queue.start();
>         } catch (Exception e) {
>             e.printStackTrace();
>         }
> 
>     }
>     
>     public static void main(String[] args) {
>         try {
>             CrawlerPropertyFile props = new CrawlerPropertyFile(args[0]);
>     
>             final String connectString;
>             System.out.println("DEBUG = " + Utils.DEBUG);
>             if (props.useZkTestServer()) {
>                 System.out.println("Will launch from zkTestServer");
>                 TestingServer server = new TestingServer();
>                 connectString = server.getConnectString();
>             } else {
>                 connectString = props.getZkServer();
>             }
>     
>             final CuratorFramework framework = Utils.newFramework(connectString);
>             framework.start();
>     
>             final MyQueueConsumer[] queueConsumers = new MyQueueConsumer[props.getNumberOfWorkers()];
>     
>             for (int i = 0; i < queueConsumers.length; i++) {
>                 queueConsumers[i] = new MyQueueConsumer(framework, "id_"+i);
>             }
>     
>             Runtime.getRuntime().addShutdownHook(new Thread() {
>                 @Override
>                 public void run() {
>                     // close workers
>                     Throwable t = null;
>                     LOG.info("We close the workers");
>                     for (MyQueueConsumer queueConsumer : queueConsumers) {
>                         try {
>                             queueConsumer.close();
>                         } catch (Throwable th) {
>                             if (t == null) {
>                                 t = th;
>                             }
>                         }
>                     }
>                     // throw first exception that we encountered
>                     if (t != null) {
>                         throw new RuntimeException("some workers failed to close", t);
>                     }
>                 }
>             });
>             
>         }catch (Exception e ){
>             e.printStackTrace();
>         }
>     }
> 
>     @Override
>     public void close() throws IOException {
>         this.queue.close();
>     }
> }
> 
> 
> 
> 
> Main
> -=-=-
> 
> package com.zk;
> 
> import org.apache.curator.framework.CuratorFramework;
> import org.apache.curator.test.TestingServer;
> 
> 
> 
> public class QueueTestMain {
> 
>     /**
>      * @param args
>      */
>     public static void main(String[] args) {
>         CrawlerPropertyFile props;
>         try {
>             props = new CrawlerPropertyFile(args[0]);
> 
>             final String connectString;
>             System.out.println("DEBUG = " + Utils.DEBUG);
>             if (props.useZkTestServer()) {
>                 System.out.println("Will launch from zkTestServer");
>                 TestingServer server = new TestingServer();
>                 connectString = server.getConnectString();
>             } else {
>                 connectString = props.getZkServer();
>             }
> 
>             final CuratorFramework framework = Utils.newFramework(connectString);
>             framework.start();
>             
> 
>             if (args[1] != null && args[1].equalsIgnoreCase("true")) {
>                 @SuppressWarnings("unused")
>                 QueueProducer producer = new QueueProducer(framework);
>             } else {
>             
>                 final MyQueueConsumer[] queueConsumers = new MyQueueConsumer[props.getNumberOfWorkers()];
>                 
>                 for (int i = 0; i < queueConsumers.length; i++) {
>                     queueConsumers[i] = new MyQueueConsumer(framework, "id_"+i);
>                 }
>         
>                 Runtime.getRuntime().addShutdownHook(new Thread() {
>                     @Override
>                     public void run() {
>                         // close workers
>                         Throwable t = null;
>                         for (MyQueueConsumer queueConsumer : queueConsumers) {
>                             try {
>                                 queueConsumer.close();
>                             } catch (Throwable th) {
>                                 if (t == null) {
>                                     t = th;
>                                 }
>                             }
>                         }
>                         // throw first exception that we encountered
>                         if (t != null) {
>                             throw new RuntimeException("some workers failed to close",
t);
>                         }
>                     }
>                 });
>     
>                 
>             }
>         }catch (Exception e ){
>             e.printStackTrace();
>         }
> 
>     }
> 
> }
> 
> 
> 
> 
> Example of output:
> 
> 
> 
> 
> 
> On Sun, Nov 17, 2013 at 10:14 PM, Jordan Zimmerman <jordan@jordanzimmerman.com>
wrote:
> Can you produce a test that shows this? Anything else interesting in the log? Of course,
there could be a bug.
> 
> -Jordan
> 
> On Nov 14, 2013, at 1:18 PM, Sznajder ForMailingList <bs4mailinglist@gmail.com>
wrote:
> 
> > Hi
> >
> > I made a short test as following:
> >
> > - I have a chorum of 3 nodes for Zookeeper.
> > - I wrote a class using Curator QueueProducer who produces all the time (when the
queue is 10% full, it creates new items) , items (random integer)
> > - I wrote a simple class using Curator Queue Consumer which simply prints to Log
"consumed item i".
> >
> > I tested some different combinations :
> > - running the consumers on one, two or three nodes.
> > - running one or more consumers in parallel on a given node.
> >
> >
> > But, and here is my question: I see some very strange behavior when I have several
consummers in parallel on a node. For example, running 5 consumers per node on 3 nodes, I
see a throughput **very** slow. When looking at my Log, I see that most of the consumers are
most of the time on an idle state....
> >
> > Do I mistake somewhere?
> >  I was expecting to enhance the throughput by augmenting the number of consumers,
I am surprised to see the opposite....
> >
> > Thanks a lot
> >
> > Benjamin
> 
> 


Mime
View raw message