Return-Path: X-Original-To: apmail-curator-user-archive@minotaur.apache.org Delivered-To: apmail-curator-user-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 257E710494 for ; Tue, 19 Nov 2013 23:39:50 +0000 (UTC) Received: (qmail 86774 invoked by uid 500); 19 Nov 2013 23:39:49 -0000 Delivered-To: apmail-curator-user-archive@curator.apache.org Received: (qmail 86690 invoked by uid 500); 19 Nov 2013 23:39:49 -0000 Mailing-List: contact user-help@curator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@curator.apache.org Delivered-To: mailing list user@curator.apache.org Received: (qmail 86682 invoked by uid 99); 19 Nov 2013 23:39:48 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 19 Nov 2013 23:39:48 +0000 X-ASF-Spam-Status: No, hits=1.5 required=5.0 tests=HTML_MESSAGE,RCVD_IN_DNSWL_LOW,SPF_PASS X-Spam-Check-By: apache.org Received-SPF: pass (athena.apache.org: domain of bs4mailinglist@gmail.com designates 74.125.83.41 as permitted sender) Received: from [74.125.83.41] (HELO mail-ee0-f41.google.com) (74.125.83.41) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 19 Nov 2013 23:39:44 +0000 Received: by mail-ee0-f41.google.com with SMTP id t10so2394645eei.28 for ; Tue, 19 Nov 2013 15:39:23 -0800 (PST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20120113; h=mime-version:in-reply-to:references:date:message-id:subject:from:to :content-type; bh=tZDHvBxlEo9fnJxicf0MjRsvVv1yY7ngUCaLYSpa9dQ=; b=Hya7M73XO6yX4RFM84czMu8v90d772RFp/BhFTl60T5/Z8wWOKoP30Udz+k7lGp9ft VFcNesTG5+kTtwqKUk2QUboXPx9L1fy5/iyQcqSOBFYFeyFEfVs7RvCXiBo6p8caT4I4 8PKCux7wde+U0uN9938TLRyQ1mss980E/gKyvuAMr0I+EvBjhVeC8BfiFedf0mCdnkc8 JT/GX2AVqAPRWEsm1ZVCzN6KaeZm2DD1vejYkJ/vqUVXvNIgLlZftcB1c4QH1WpWqoQB VDl4Xgd4cWgjBdZH1p/vIKlMnDg39bc9K+f0OiBMpMmaIRziWvPp4lZXzKZmSMu/ILjn KqBg== MIME-Version: 1.0 X-Received: by 10.14.88.132 with SMTP id a4mr196226eef.60.1384904362860; Tue, 19 Nov 2013 15:39:22 -0800 (PST) Received: by 10.14.75.199 with HTTP; Tue, 19 Nov 2013 15:39:22 -0800 (PST) In-Reply-To: References: <74C68A4C-4B6F-443B-AF0E-A012DAC50060@jordanzimmerman.com> Date: Wed, 20 Nov 2013 01:39:22 +0200 Message-ID: Subject: Re: Multiple consumers on a single server - strange behavior. From: Sznajder ForMailingList To: user@curator.apache.org Content-Type: multipart/alternative; boundary=001a11c1ba0498a30304eb902cbf X-Virus-Checked: Checked by ClamAV on apache.org --001a11c1ba0498a30304eb902cbf Content-Type: text/plain; charset=windows-1252 Content-Transfer-Encoding: quoted-printable Hi Jordan Following your advice, I moved the processing of the event outside of the listener. However, I do not catch any event .... What did I write wrong? Thanks a lot! package com..hrl.zk; import java.io.Closeable; import java.io.IOException; import java.text.DateFormat; import java.text.SimpleDateFormat; import java.util.Date; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.api.CuratorEvent; import org.apache.curator.framework.api.CuratorListener; import org.apache.curator.framework.recipes.queue.DistributedQueue; import org.apache.curator.test.TestingServer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com...crawler.CrawlUrl; import com...crawler.Utils; import com...main.CrawlerPropertyFile; public class QueueProducer implements Closeable { final static Logger LOG =3D LoggerFactory.getLogger(QueueProducer.class= ); final static DateFormat dateFormat =3D new SimpleDateFormat("HH:mm:ss")= ; DistributedQueue queue; private static final int QUEUE_SIZE =3D 100; int items; int cnt; private class QueueAdder extends Thread { private int size; public QueueAdder(int size) { System.out.println("QueueAdder " + size); this.size =3D size; } @Override public void run() { LOG.info("QueueAdder ! " ); for (int i =3D 0; i < this.size; i++ ) { try { QueueProducer.this.queue.put(new CrawlUrl(""+QueueProducer.this.items++)); } catch (Exception e) { e.printStackTrace(); } } LOG.info("DONE!"); } } public QueueProducer(CuratorFramework framework) throws Exception { LOG.info(java.net.InetAddress.getLocalHost().getHostName() + " is a QueueProducer"); System.out.println(java.net.InetAddress.getLocalHost().getHostName() + " is a QueueProducer"); this.queue =3D Utils.newDistributedQueue(framework, Utils.CRAWL_QUEUE_PATH, Utils.CRAWL_QUEUE_LOCK_PATH, null); this.queue.start(); addQueueContent(QUEUE_SIZE); System.out.println("Done with the initial init"); // We register to the listener for monitoring the number of element= s // in the queue framework.getCuratorListenable().addListener(new CuratorListener() = { @Override public void eventReceived(final CuratorFramework framework_, CuratorEvent event) throws Exception { LOG.info("CNT " + QueueProducer.this.cnt); if (QueueProducer.this.cnt++ <=3D QUEUE_SIZE /2) { QueueAdder queueAdder =3D new QueueAdder(QUEUE_SIZE - QueueProducer.this.cnt ); queueAdder.start(); } } }); } void addQueueContent(int numberOfItems) { LOG.info(dateFormat.format(new Date()) + " - addQueueContent " + numberOfItems); for (int i =3D 0; i < numberOfItems; i++) { try { CrawlUrl url =3D new CrawlUrl(""+this.items++); this.queue.put(url); } catch (Exception e) { LOG.error ("Caught an error when adding the item " + i + " in the initQueueContent()"); } } } @Override public void close() throws IOException { this.queue.close(); } } On Mon, Nov 18, 2013 at 12:59 AM, Jordan Zimmerman < jordan@jordanzimmerman.com> wrote: > I don=92t think there=92s any reason to assume that each consumer will pr= ocess > an equal number of messages. > > -JZ > > On Nov 17, 2013, at 2:51 PM, Sznajder ForMailingList < > bs4mailinglist@gmail.com> wrote: > > Hi Jordan.. > > Regarding the output: > > As you can see the LOG prints the name of the consumer and "processed " > and the item.... > > I am running the program with 4 servers in my chorum: > ir-hadoop1 server is a producer > ir-hadoop2--> ir-hadoop4 are consumers. > > After 14 minutes, I simply count the number of procssed items on each one > of the consumer (a simplistic grep on the LOG file) and I get the folliwn= g: > > ir-hadoop2 : 3042 processed items > ir-hadoop3 : 1276 processed items > ir-hadoop4 : 830 processed items... > > If I have a look at the procssed times, I can see that ir-hadoop4 , is > most of the time idle... I attach here the LOG corresponding to ir-hadoop= 4 > for example > > Benjamin > > > On Mon, Nov 18, 2013 at 12:03 AM, Jordan Zimmerman < > jordan@jordanzimmerman.com> wrote: > >> The example out is missing. Please provide that too. >> >> -Jordan >> >> On Nov 17, 2013, at 1:20 PM, Sznajder ForMailingList < >> bs4mailinglist@gmail.com> wrote: >> >> First at all , thank you for your answer. >> >> Here is the simple code, I used: >> >> The producer and queueconsummer are given in the class >> >> Every 5 minutes, I am printing the the number of processed items, and I >> see some drastic differences between the different consumers: >> >> >> >> Producer: >> =3D-=3D-=3D-=3D-=3D >> >> package com.zk; >> >> import java.io.Closeable; >> import java.io.IOException; >> import java.text.DateFormat; >> import java.text.SimpleDateFormat; >> import java.util.Date; >> import java.util.List; >> >> import org.apache.curator.framework.CuratorFramework; >> import org.apache.curator.framework.api.CuratorEvent; >> import org.apache.curator.framework.api.CuratorListener; >> import org.apache.curator.framework.recipes.queue.DistributedQueue; >> import org.apache.curator.test.TestingServer; >> import org.slf4j.Logger; >> import org.slf4j.LoggerFactory; >> >> >> public class QueueProducer implements Closeable { >> >> final static Logger LOG =3D >> LoggerFactory.getLogger(QueueProducer.class); >> >> final static DateFormat dateFormat =3D new SimpleDateFormat("HH:mm:s= s"); >> >> protected static final String PATH =3D "/test_queue"; >> >> protected static final String LOCK_PATH =3D "/test_lock_queue"; >> >> private DistributedQueue queue; >> >> private static final int QUEUE_SIZE =3D 100000; >> >> private int items; >> >> public QueueProducer(CuratorFramework framework) throws Exception { >> LOG.info(java.net.InetAddress.getLocalHost().getHostName() + " i= s >> a QueueProducer"); >> >> System.out.println(java.net.InetAddress.getLocalHost().getHostName() + "= is >> a QueueProducer"); >> this.queue =3D Utils.newDistributedQueue(framework, >> Utils.CRAWL_QUEUE_PATH, Utils.CRAWL_QUEUE_LOCK_PATH, >> null); >> this.queue.start(); >> addQueueContent(QUEUE_SIZE); >> System.out.println("Done with the initial init"); >> >> >> // We register to the listener for monitoring the number of >> elements >> // in the queue >> framework.getCuratorListenable().addListener(new >> CuratorListener() { >> @Override >> public void eventReceived(final CuratorFramework framework_, >> CuratorEvent event) throws Exception { >> if (event.getPath() !=3D null && >> event.getPath().equals(Utils.CRAWL_QUEUE_PATH)) { >> // this also restores the notification >> List children =3D framework_.getChildren() >> .watched().forPath(Utils.CRAWL_QUEUE_PATH); >> if (children.size() <=3D QUEUE_SIZE/2) { >> addQueueContent(QUEUE_SIZE - children.size()); >> } >> } >> } >> }); >> >> >> while (true) { >> List children =3D >> framework.getChildren().watched().forPath(Utils.CRAWL_QUEUE_PATH); >> if (children.size() <=3D QUEUE_SIZE/2) { >> LOG.info(dateFormat.format(new Date()) + " - In the >> while(true) - We call for size " + children.size()); >> addQueueContent(QUEUE_SIZE - children.size()); >> } >> >> Thread.sleep(5000); >> >> } >> } >> >> void addQueueContent(int numberOfItems) { >> LOG.info(dateFormat.format(new Date()) + " - addQueueContent " + >> numberOfItems); >> for (int i =3D 0; i < numberOfItems; i++) { >> try { >> CrawlUrl url =3D new CrawlUrl(""+this.items++); >> this.queue.put(url); >> } catch (Exception e) { >> LOG.error ("Caught an error when adding the item " + i + >> " in the initQueueContent()"); >> } >> } >> } >> >> public static void main(String[] args) { >> CrawlerPropertyFile props; >> try { >> props =3D new CrawlerPropertyFile(args[0]); >> >> final String connectString; >> System.out.println("DEBUG =3D " + Utils.DEBUG); >> if (props.useZkTestServer()) { >> System.out.println("Will launch from zkTestServer"); >> TestingServer server =3D new TestingServer(); >> connectString =3D server.getConnectString(); >> } else { >> connectString =3D props.getZkServer(); >> } >> >> final CuratorFramework framework =3D >> Utils.newFramework(connectString); >> framework.start(); >> >> @SuppressWarnings("unused") >> QueueProducer producer =3D new QueueProducer(framework); >> } catch (Exception e) { >> e.printStackTrace(); >> } >> >> } >> >> @Override >> public void close() throws IOException { >> this.queue.close(); >> } >> >> >> >> } >> >> >> >> >> Consumer >> =3D-=3D-=3D-=3D-=3D- >> >> package com.zk; >> >> import java.io.Closeable; >> import java.io.File; >> import java.io.FileWriter; >> import java.io.IOException; >> import java.text.DateFormat; >> import java.text.SimpleDateFormat; >> import java.util.Date; >> >> import org.apache.curator.framework.CuratorFramework; >> import org.apache.curator.framework.recipes.queue.DistributedQueue; >> import org.apache.curator.framework.recipes.queue.QueueConsumer; >> import org.apache.curator.framework.state.ConnectionState; >> import org.apache.curator.test.TestingServer; >> import org.slf4j.Logger; >> import org.slf4j.LoggerFactory; >> >> >> >> public class MyQueueConsumer implements Closeable{ >> >> private DistributedQueue queue; >> >> String name; >> >> String id; >> >> FileWriter timeCounter; >> >> final static Logger LOG =3D >> LoggerFactory.getLogger(MyQueueConsumer.class); >> >> final static DateFormat dateFormat =3D new SimpleDateFormat("HH:mm:s= s"); >> >> int numberOfProcessedURL; >> >> private com.zk.MyQueueConsumer.FileWriterThread timeCounterThread; >> >> private class FileWriterThread extends Thread { >> >> public FileWriterThread() { >> // empty ctor >> } >> >> @Override >> public void run() { >> // We write the stats: >> >> try { >> while (true) { >> >> MyQueueConsumer.this.timeCounter.write(dateFormat.format(new Date()) + "= "+ >> >> "[numberOfProcessed=3D"+MyQueueConsumer.this.numberOfProcessedURL +"]= \n") ; >> MyQueueConsumer.this.timeCounter.flush(); >> >> // Sleeps 5 minutes >> Thread.sleep(300000); >> } >> } catch (Exception e) { >> // TODO Auto-generated catch block >> e.printStackTrace(); >> } >> } >> } >> >> >> public MyQueueConsumer(CuratorFramework framework, final String id) >> throws Exception { >> this.id =3D id; >> this.name =3D java.net.InetAddress.getLocalHost().getHostName(); >> this.timeCounter =3D new FileWriter(new File("MyQueueConsumer_"+ >> this.name + "_" +id + "_timeCounter.txt")); >> >> // this.timeCounterThread =3D new FileWriterThread(); >> // this.timeCounterThread.start(); >> this.queue =3D Utils.newDistributedQueue(framework, >> Utils.CRAWL_QUEUE_PATH, Utils.CRAWL_QUEUE_LOCK_PATH, new >> QueueConsumer() { >> >> @Override >> public void stateChanged(CuratorFramework client, >> ConnectionState newState) { >> System.out.println(String.format("[%s] connection state >> changed to %s", id, newState)); >> } >> >> @Override >> public void consumeMessage(CrawlUrl url) throws Exception { >> try { >> LOG.info(dateFormat.format(new >> Date(System.currentTimeMillis())) + "["+id+ "-" + >> MyQueueConsumer.this.name + "] >> processed " + url.url); >> MyQueueConsumer.this.numberOfProcessedURL++; >> } catch (Exception e) { >> LOG.error( "["+id+ "-" + MyQueueConsumer.this.name+ >> "]" + e.getMessage() + " for url " + url.url ); >> } >> } >> >> }); >> try { >> this.queue.start(); >> } catch (Exception e) { >> e.printStackTrace(); >> } >> >> } >> >> public static void main(String[] args) { >> try { >> CrawlerPropertyFile props =3D new CrawlerPropertyFile(args[0= ]); >> >> final String connectString; >> System.out.println("DEBUG =3D " + Utils.DEBUG); >> if (props.useZkTestServer()) { >> System.out.println("Will launch from zkTestServer"); >> TestingServer server =3D new TestingServer(); >> connectString =3D server.getConnectString(); >> } else { >> connectString =3D props.getZkServer(); >> } >> >> final CuratorFramework framework =3D >> Utils.newFramework(connectString); >> framework.start(); >> >> final MyQueueConsumer[] queueConsumers =3D new >> MyQueueConsumer[props.getNumberOfWorkers()]; >> >> for (int i =3D 0; i < queueConsumers.length; i++) { >> queueConsumers[i] =3D new MyQueueConsumer(framework, >> "id_"+i); >> } >> >> Runtime.getRuntime().addShutdownHook(new Thread() { >> @Override >> public void run() { >> // close workers >> Throwable t =3D null; >> LOG.info("We close the workers"); >> for (MyQueueConsumer queueConsumer : queueConsumers)= { >> try { >> queueConsumer.close(); >> } catch (Throwable th) { >> if (t =3D=3D null) { >> t =3D th; >> } >> } >> } >> // throw first exception that we encountered >> if (t !=3D null) { >> throw new RuntimeException("some workers failed >> to close", t); >> } >> } >> }); >> >> }catch (Exception e ){ >> e.printStackTrace(); >> } >> } >> >> @Override >> public void close() throws IOException { >> this.queue.close(); >> } >> } >> >> >> >> >> Main >> -=3D-=3D- >> >> package com.zk; >> >> import org.apache.curator.framework.CuratorFramework; >> import org.apache.curator.test.TestingServer; >> >> >> >> public class QueueTestMain { >> >> /** >> * @param args >> */ >> public static void main(String[] args) { >> CrawlerPropertyFile props; >> try { >> props =3D new CrawlerPropertyFile(args[0]); >> >> final String connectString; >> System.out.println("DEBUG =3D " + Utils.DEBUG); >> if (props.useZkTestServer()) { >> System.out.println("Will launch from zkTestServer"); >> TestingServer server =3D new TestingServer(); >> connectString =3D server.getConnectString(); >> } else { >> connectString =3D props.getZkServer(); >> } >> >> final CuratorFramework framework =3D >> Utils.newFramework(connectString); >> framework.start(); >> >> >> if (args[1] !=3D null && args[1].equalsIgnoreCase("true")) { >> @SuppressWarnings("unused") >> QueueProducer producer =3D new QueueProducer(framework); >> } else { >> >> final MyQueueConsumer[] queueConsumers =3D new >> MyQueueConsumer[props.getNumberOfWorkers()]; >> >> for (int i =3D 0; i < queueConsumers.length; i++) { >> queueConsumers[i] =3D new MyQueueConsumer(framework, >> "id_"+i); >> } >> >> Runtime.getRuntime().addShutdownHook(new Thread() { >> @Override >> public void run() { >> // close workers >> Throwable t =3D null; >> for (MyQueueConsumer queueConsumer : >> queueConsumers) { >> try { >> queueConsumer.close(); >> } catch (Throwable th) { >> if (t =3D=3D null) { >> t =3D th; >> } >> } >> } >> // throw first exception that we encountered >> if (t !=3D null) { >> throw new RuntimeException("some workers >> failed to close", t); >> } >> } >> }); >> >> >> } >> }catch (Exception e ){ >> e.printStackTrace(); >> } >> >> } >> >> } >> >> >> >> >> Example of output: >> >> >> >> >> >> On Sun, Nov 17, 2013 at 10:14 PM, Jordan Zimmerman < >> jordan@jordanzimmerman.com> wrote: >> >>> Can you produce a test that shows this? Anything else interesting in th= e >>> log? Of course, there could be a bug. >>> >>> -Jordan >>> >>> On Nov 14, 2013, at 1:18 PM, Sznajder ForMailingList < >>> bs4mailinglist@gmail.com> wrote: >>> >>> > Hi >>> > >>> > I made a short test as following: >>> > >>> > - I have a chorum of 3 nodes for Zookeeper. >>> > - I wrote a class using Curator QueueProducer who produces all the >>> time (when the queue is 10% full, it creates new items) , items (random >>> integer) >>> > - I wrote a simple class using Curator Queue Consumer which simply >>> prints to Log "consumed item i". >>> > >>> > I tested some different combinations : >>> > - running the consumers on one, two or three nodes. >>> > - running one or more consumers in parallel on a given node. >>> > >>> > >>> > But, and here is my question: I see some very strange behavior when I >>> have several consummers in parallel on a node. For example, running 5 >>> consumers per node on 3 nodes, I see a throughput **very** slow. When >>> looking at my Log, I see that most of the consumers are most of the tim= e on >>> an idle state.... >>> > >>> > Do I mistake somewhere? >>> > I was expecting to enhance the throughput by augmenting the number o= f >>> consumers, I am surprised to see the opposite.... >>> > >>> > Thanks a lot >>> > >>> > Benjamin >>> >>> >> >> > > > > --001a11c1ba0498a30304eb902cbf Content-Type: text/html; charset=windows-1252 Content-Transfer-Encoding: quoted-printable
Hi Jordan

Following your advic= e, I moved the processing of the event outside of the listener.

Howe= ver, I do not catch any event ....

What did I write wrong?

Thanks a lot!

package com..hrl.zk;

import java= .io.Closeable;
import java.io.IOException;
import java.text.DateForma= t;
import java.text.SimpleDateFormat;
import java.util.Date;

import org.apache.curator.framework.CuratorFramework;
import org.apache.= curator.framework.api.CuratorEvent;
import org.apache.curator.framework.= api.CuratorListener;
import org.apache.curator.framework.recipes.queue.D= istributedQueue;
import org.apache.curator.test.TestingServer;
import org.slf4j.Logger;import org.slf4j.LoggerFactory;

import com...crawler.CrawlUrl;
= import com...crawler.Utils;
import com...main.CrawlerPropertyFile;

public class QueueProducer implements Closeable {
=A0=A0=A0
=A0= =A0=A0 final static Logger LOG =3D LoggerFactory.getLogger(QueueProducer.cl= ass);
=A0=A0=A0
=A0=A0=A0 final static DateFormat dateFormat =3D new= SimpleDateFormat("HH:mm:ss");
=A0=A0=A0
=A0=A0=A0 DistributedQueue<CrawlUrl> queue;
=A0=A0= =A0
=A0=A0=A0 private static final int QUEUE_SIZE =3D 100;
=A0=A0=A0=
=A0=A0=A0 int items;
=A0=A0=A0
=A0=A0=A0 int cnt;

=A0=A0= =A0 private class QueueAdder extends Thread {
=A0=A0=A0 =A0=A0=A0
=A0=A0=A0 =A0=A0=A0 private int size;
=A0=A0=A0 =A0=A0=A0 public QueueAd= der(int size) {
=A0=A0=A0 =A0=A0=A0 =A0=A0=A0 System.out.println("Q= ueueAdder " + size);
=A0=A0=A0 =A0=A0=A0 =A0=A0=A0 this.size =3D si= ze;
=A0=A0=A0 =A0=A0=A0 }

=A0=A0=A0 =A0=A0=A0 @Override
=A0=A0= =A0 =A0=A0=A0 public void run() {
=A0=A0=A0 =A0=A0=A0 =A0=A0=A0 LOG.info("QueueAdder ! " );
=A0= =A0=A0 =A0=A0=A0 =A0=A0=A0 for (int i =3D 0; i < this.size; i++ ) {
= =A0=A0=A0 =A0=A0=A0 =A0=A0=A0 =A0=A0=A0 try {
=A0=A0=A0 =A0=A0=A0 =A0=A0= =A0 =A0=A0=A0 =A0=A0=A0 QueueProducer.this.queue.put(new CrawlUrl("&qu= ot;+QueueProducer.this.items++));
=A0=A0=A0 =A0=A0=A0 =A0=A0=A0 =A0=A0=A0 } catch (Exception e) {
=A0=A0= =A0 =A0=A0=A0 =A0=A0=A0 =A0=A0=A0 =A0=A0=A0 e.printStackTrace();
=A0=A0= =A0 =A0=A0=A0 =A0=A0=A0 =A0=A0=A0 }
=A0=A0=A0 =A0=A0=A0 =A0=A0=A0 }
= =A0=A0=A0 =A0=A0=A0 =A0=A0=A0 LOG.info("DONE!");
=A0=A0=A0 =A0= =A0=A0 }
=A0=A0=A0 }
=A0=A0=A0
=A0=A0=A0 public QueueProducer(Cur= atorFramework framework) throws Exception {
=A0=A0=A0 =A0=A0=A0 LOG.info(java.net.InetAddress.getLocalHost().getHostNam= e() + " is a QueueProducer");
=A0=A0=A0 =A0=A0=A0 System.out.p= rintln(java.net.InetAddress.getLocalHost().getHostName() + " is a Queu= eProducer");
=A0=A0=A0 =A0=A0=A0 this.queue =3D Utils.newDistributedQueue(framework,
= =A0=A0=A0 =A0=A0=A0 =A0=A0=A0 =A0=A0=A0 Utils.CRAWL_QUEUE_PATH, Utils.CRAWL= _QUEUE_LOCK_PATH, null);
=A0=A0=A0 =A0=A0=A0 this.queue.start();
=A0= =A0=A0 =A0=A0=A0 addQueueContent(QUEUE_SIZE);
=A0=A0=A0 =A0=A0=A0 System= .out.println("Done with the initial init");


=A0=A0=A0 =A0=A0=A0 // We register to the listener for monitoring t= he number of elements
=A0=A0=A0 =A0=A0=A0 // in the queue
=A0=A0=A0 = =A0=A0=A0 framework.getCuratorListenable().addListener(new CuratorListener(= ) {
=A0=A0=A0 =A0=A0=A0 =A0=A0=A0 @Override
=A0=A0=A0 =A0=A0=A0 =A0= =A0=A0 public void eventReceived(final CuratorFramework framework_,
=A0=A0=A0 =A0=A0=A0 =A0=A0=A0 =A0=A0=A0 =A0=A0=A0 CuratorEvent event) throw= s Exception {
=A0=A0=A0 =A0=A0=A0 =A0=A0=A0 =A0=A0=A0 LOG.info("CNT= " + QueueProducer.this.cnt);
=A0=A0=A0 =A0=A0=A0 =A0=A0=A0 =A0=A0= =A0 if (QueueProducer.this.cnt++ <=3D QUEUE_SIZE /2) {
=A0=A0=A0 =A0= =A0=A0 =A0=A0=A0 =A0=A0=A0 =A0=A0=A0 QueueAdder queueAdder =3D new QueueAdd= er(QUEUE_SIZE - QueueProducer.this.cnt );
=A0=A0=A0 =A0=A0=A0 =A0=A0=A0 =A0=A0=A0 =A0=A0=A0 queueAdder.start();
= =A0=A0=A0 =A0=A0=A0 =A0=A0=A0 =A0=A0=A0 }
=A0=A0=A0
=A0=A0=A0 =A0=A0= =A0 =A0=A0=A0 }
=A0=A0=A0 =A0=A0=A0 });

=A0=A0=A0 }
=A0=A0=A0 =
=A0=A0=A0

=A0=A0=A0 void addQueueContent(int numberOfItems) {=A0=A0=A0 =A0=A0=A0 LOG.info(dateFormat.format(new Date()) + " - add= QueueContent " + numberOfItems);
=A0=A0=A0 =A0=A0=A0 for (int i =3D 0; i < numberOfItems; i++) {
=A0= =A0=A0 =A0=A0=A0 =A0=A0=A0 try {
=A0=A0=A0 =A0=A0=A0 =A0=A0=A0 =A0=A0=A0= CrawlUrl url =3D new CrawlUrl(""+this.items++);
=A0=A0=A0 =A0= =A0=A0 =A0=A0=A0 =A0=A0=A0 this.queue.put(url);
=A0=A0=A0 =A0=A0=A0 =A0= =A0=A0 } catch (Exception e) {
=A0=A0=A0 =A0=A0=A0 =A0=A0=A0 =A0=A0=A0 LOG.error ("Caught an error wh= en adding the item " + i + " in the initQueueContent()");=A0=A0=A0 =A0=A0=A0 =A0=A0=A0 }
=A0=A0=A0 =A0=A0=A0 }
=A0=A0=A0 }=A0=A0=A0


=A0=A0=A0 @Override
=A0=A0=A0 public void close()= throws IOException {
=A0=A0=A0 =A0=A0=A0 this.queue.close();
=A0=A0=A0 }
=A0=A0=A0
=A0= =A0=A0

}



On Mon, Nov 18, 2013 at 12:59 AM, Jordan Zimmerman <= span dir=3D"ltr"><jordan@jordanzimmerman.com> wrote:
I d= on=92t think there=92s any reason to assume that each consumer will process= an equal number of messages.=A0

-JZ

On Nov 17= , 2013, at 2:51 PM, Sznajder ForMailingList <bs4mailinglist@gmail.com> wrote:<= /div>
Hi Jordan..

Regardi= ng the output:

As you can see the LOG prints the name of the c= onsumer and "processed " and the item....

I am running the program with 4 servers in my chorum:
ir-hadoop1 server is a producer
ir-hadoop2--> ir-hadoop4 = are consumers.

After 14 minutes, I simply count the number of procss= ed items on each one of the consumer (a simplistic grep on the LOG file) an= d I get the folliwng:

ir-hadoop2 : 3042 processed items
ir-hadoop3 : 1276 processed = items
ir-hadoop4 : 830 processed items...

If I have a= look at the procssed times, I can see that ir-hadoop4 , is most of the tim= e idle... I attach here the LOG corresponding to ir-hadoop4 for example

Benjamin


On Mon, Nov 18, 2013 at 12:03 AM, Jordan Zimmerman <jordan@jordanzimmerman.com> wrote:
The= example out is missing. Please provide that too.

-Jordan

On Nov 17= , 2013, at 1:20 PM, Sznajder ForMailingList <bs4mailinglist@gmail.com> wrote:<= /div>
First at all , thank you for your answer.

Her= e is the simple code, I used:

The producer and queueconsummer = are given in the class

Every 5 minutes, I am printing the the number of processed items, and I see=20 some drastic differences between the different consumers:


=
Producer:
=3D-=3D-=3D-=3D-=3D

package com.zk;

im= port java.io.Closeable;
import java.io.IOException;
import java.text.= DateFormat;
import java.text.SimpleDateFormat;
import java.util.Date;
import java= .util.List;

import org.apache.curator.framework.CuratorFramework;import org.apache.curator.framework.api.CuratorEvent;
import org.apache= .curator.framework.api.CuratorListener;
import org.apache.curator.framework.recipes.queue.DistributedQueue;
impo= rt org.apache.curator.test.TestingServer;
import org.slf4j.Logger;
im= port org.slf4j.LoggerFactory;


public class QueueProducer impleme= nts Closeable {
=A0=A0=A0
=A0=A0=A0 final static Logger LOG =3D LoggerFactory.getLogger= (QueueProducer.class);
=A0=A0=A0
=A0=A0=A0 final static DateFormat d= ateFormat =3D new SimpleDateFormat("HH:mm:ss");
=A0=A0=A0
= =A0=A0=A0 protected static final String PATH =3D "/test_queue";
=A0=A0=A0 protected static final String LOCK_PATH =3D "/test_lock_= queue";
=A0=A0=A0
=A0=A0=A0 private DistributedQueue<CrawlUr= l> queue;
=A0=A0=A0
=A0=A0=A0 private static final int QUEUE_SIZE= =3D 100000;
=A0=A0=A0
=A0=A0=A0 private int items;

=A0=A0=A0 public QueueProducer(CuratorFramework framework) throws Excep= tion {
=A0=A0=A0 =A0=A0=A0 LOG.info(java.net.InetAddress.getLocalHost().= getHostName() + " is a QueueProducer");
=A0=A0=A0 =A0=A0=A0 Sy= stem.out.println(java.net.InetAddress.getLocalHost().getHostName() + "= is a QueueProducer");
=A0=A0=A0 =A0=A0=A0 this.queue =3D Utils.newDistributedQueue(framework,
= =A0=A0=A0 =A0=A0=A0 =A0=A0=A0 =A0=A0=A0 Utils.CRAWL_QUEUE_PATH, Utils.CRAWL= _QUEUE_LOCK_PATH, null);
=A0=A0=A0 =A0=A0=A0 this.queue.start();
=A0= =A0=A0 =A0=A0=A0 addQueueContent(QUEUE_SIZE);
=A0=A0=A0 =A0=A0=A0 System= .out.println("Done with the initial init");


=A0=A0=A0 =A0=A0=A0 // We register to the listener for monitoring t= he number of elements
=A0=A0=A0 =A0=A0=A0 // in the queue
=A0=A0=A0 = =A0=A0=A0 framework.getCuratorListenable().addListener(new CuratorListener(= ) {
=A0=A0=A0 =A0=A0=A0 =A0=A0=A0 @Override
=A0=A0=A0 =A0=A0=A0 =A0=A0=A0 public void eventReceived(final CuratorFramew= ork framework_,
=A0=A0=A0 =A0=A0=A0 =A0=A0=A0 =A0=A0=A0 =A0=A0=A0 CuratorEvent event) throw= s Exception {
=A0=A0=A0 =A0=A0=A0 =A0=A0=A0 =A0=A0=A0 if (event.getPath(= ) !=3D null=A0=A0=A0 && event.getPath().equals(Utils.CRAWL_QUEUE_PA= TH)) {
=A0=A0=A0 =A0=A0=A0 =A0=A0=A0 =A0=A0=A0 =A0=A0=A0 // this also re= stores the notification
=A0=A0=A0 =A0=A0=A0 =A0=A0=A0 =A0=A0=A0 =A0=A0=A0 List<String> childr= en =3D framework_.getChildren()
=A0=A0=A0 =A0=A0=A0 =A0=A0=A0 =A0=A0=A0 = =A0=A0=A0 =A0=A0=A0 =A0=A0=A0 .watched().forPath(Utils.CRAWL_QUEUE_PATH);=A0=A0=A0 =A0=A0=A0 =A0=A0=A0 =A0=A0=A0 =A0=A0=A0 if (children.size() <= ;=3D QUEUE_SIZE/2) {
=A0=A0=A0 =A0=A0=A0 =A0=A0=A0 =A0=A0=A0 =A0=A0=A0 = =A0=A0=A0 addQueueContent(QUEUE_SIZE - children.size());
=A0=A0=A0 =A0=A0=A0 =A0=A0=A0 =A0=A0=A0 =A0=A0=A0 }
=A0=A0=A0 =A0=A0=A0 = =A0=A0=A0 =A0=A0=A0 }
=A0=A0=A0 =A0=A0=A0 =A0=A0=A0 }
=A0=A0=A0 =A0= =A0=A0 });


=A0=A0=A0 =A0=A0=A0 while (true) {
=A0=A0=A0 =A0= =A0=A0 =A0=A0=A0 List<String> children =3D framework.getChildren().wa= tched().forPath(Utils.CRAWL_QUEUE_PATH);
=A0=A0=A0 =A0=A0=A0 =A0=A0=A0 if (children.size() <=3D QUEUE_SIZE/2) {=A0=A0=A0 =A0=A0=A0 =A0=A0=A0 =A0=A0=A0 LOG.info(dateFormat.format(new Da= te()) + " - In the while(true) - We call for size " + children.si= ze());
=A0=A0=A0 =A0=A0=A0 =A0=A0=A0 =A0=A0=A0 addQueueContent(QUEUE_SIZ= E - children.size());
=A0=A0=A0 =A0=A0=A0 =A0=A0=A0 }=A0=A0=A0 =A0=A0=A0 =A0=A0=A0
=A0=A0=A0 = =A0=A0=A0 =A0=A0=A0 =A0=A0=A0
=A0=A0=A0 =A0=A0=A0 =A0=A0=A0 Thread.slee= p(5000);

=A0=A0=A0 =A0=A0=A0 }
=A0=A0=A0 }

=A0=A0=A0 void = addQueueContent(int numberOfItems) {
=A0=A0=A0 =A0=A0=A0 LOG.info(dateFo= rmat.format(new Date()) + " - addQueueContent " + numberOfItems);=
=A0=A0=A0 =A0=A0=A0 for (int i =3D 0; i < numberOfItems; i++) {
=A0= =A0=A0 =A0=A0=A0 =A0=A0=A0 try {
=A0=A0=A0 =A0=A0=A0 =A0=A0=A0 =A0=A0=A0= CrawlUrl url =3D new CrawlUrl(""+this.items++);
=A0=A0=A0 =A0= =A0=A0 =A0=A0=A0 =A0=A0=A0 this.queue.put(url);
=A0=A0=A0 =A0=A0=A0 =A0= =A0=A0 } catch (Exception e) {
=A0=A0=A0 =A0=A0=A0 =A0=A0=A0 =A0=A0=A0 LOG.error ("Caught an error wh= en adding the item " + i + " in the initQueueContent()");=A0=A0=A0 =A0=A0=A0 =A0=A0=A0 }
=A0=A0=A0 =A0=A0=A0 }
=A0=A0=A0 }=A0=A0=A0
=A0=A0=A0 public static void main(String[] args) {
=A0=A0=A0 =A0=A0=A0 CrawlerPropertyFile props;
=A0=A0=A0 =A0=A0=A0 try {=
=A0=A0=A0 =A0=A0=A0 =A0=A0=A0 props =3D new CrawlerPropertyFile(args[0]= );

=A0=A0=A0 =A0=A0=A0 =A0=A0=A0 final String connectString;
=A0= =A0=A0 =A0=A0=A0 =A0=A0=A0 System.out.println("DEBUG =3D " + Util= s.DEBUG);
=A0=A0=A0 =A0=A0=A0 =A0=A0=A0 if (props.useZkTestServer()) {
=A0=A0=A0 = =A0=A0=A0 =A0=A0=A0 =A0=A0=A0 System.out.println("Will launch from zkT= estServer");
=A0=A0=A0 =A0=A0=A0 =A0=A0=A0 =A0=A0=A0 TestingServer = server =3D new TestingServer();
=A0=A0=A0 =A0=A0=A0 =A0=A0=A0 =A0=A0=A0 = connectString =3D server.getConnectString();
=A0=A0=A0 =A0=A0=A0 =A0=A0=A0 } else {
=A0=A0=A0 =A0=A0=A0 =A0=A0=A0 =A0= =A0=A0 connectString =3D props.getZkServer();
=A0=A0=A0 =A0=A0=A0 =A0=A0= =A0 }

=A0=A0=A0 =A0=A0=A0 =A0=A0=A0 final CuratorFramework framework= =3D Utils.newFramework(connectString);
=A0=A0=A0 =A0=A0=A0 =A0=A0=A0 fr= amework.start();

=A0=A0=A0 =A0=A0=A0 =A0=A0=A0 @SuppressWarnings("unused")
=A0= =A0=A0 =A0=A0=A0 =A0=A0=A0 QueueProducer producer =3D new QueueProducer(fra= mework);
=A0=A0=A0 =A0=A0=A0 } catch (Exception e) {
=A0=A0=A0 =A0=A0= =A0 =A0=A0=A0 e.printStackTrace();
=A0=A0=A0 =A0=A0=A0 }

=A0=A0= =A0 }

=A0=A0=A0 @Override
=A0=A0=A0 public void close() throws IOException {=A0=A0=A0 =A0=A0=A0 this.queue.close();
=A0=A0=A0 }
=A0=A0=A0
= =A0=A0=A0

}




Consumer
=3D-=3D-=3D-=3D-= =3D-

package com.zk;

import java.io.Closeable;
import java.io.File;
import java.io.FileWriter;
import java.io.IOExce= ption;
import java.text.DateFormat;
import java.text.SimpleDateFormat= ;
import java.util.Date;

import org.apache.curator.framework.Cura= torFramework;
import org.apache.curator.framework.recipes.queue.DistributedQueue;
impo= rt org.apache.curator.framework.recipes.queue.QueueConsumer;
import org.= apache.curator.framework.state.ConnectionState;
import org.apache.curato= r.test.TestingServer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;



= public class MyQueueConsumer implements Closeable{
=A0=A0=A0
=A0=A0= =A0 private DistributedQueue<CrawlUrl> queue;

=A0=A0=A0 String= name;

=A0=A0=A0 String id;

=A0=A0=A0 FileWriter timeCounter;
=A0=A0=A0
=A0=A0=A0 final stat= ic Logger LOG =3D LoggerFactory.getLogger(MyQueueConsumer.class);
=A0=A0= =A0
=A0=A0=A0 final static DateFormat dateFormat =3D new SimpleDateForm= at("HH:mm:ss");
=A0=A0=A0
=A0=A0=A0 int numberOfProcessedURL;

=A0=A0=A0 private com.zk.MyQueue= Consumer.FileWriterThread timeCounterThread;
=A0=A0=A0
=A0=A0=A0 pri= vate class FileWriterThread extends Thread {

=A0=A0=A0 =A0=A0=A0 pub= lic FileWriterThread() {
=A0=A0=A0 =A0=A0=A0 =A0=A0=A0 // empty ctor
=A0=A0=A0 =A0=A0=A0 }

=A0=A0=A0 =A0=A0=A0 @Override
=A0=A0=A0 =A0= =A0=A0 public void run() {
=A0=A0=A0 =A0=A0=A0 =A0=A0=A0 // We write the= stats:

=A0=A0=A0 =A0=A0=A0 =A0=A0=A0 try {
=A0=A0=A0 =A0=A0=A0 = =A0=A0=A0 =A0=A0=A0 while (true) {
=A0=A0=A0 =A0=A0=A0 =A0=A0=A0 =A0=A0= =A0 =A0=A0=A0 MyQueueConsumer.this.timeCounter.write(dateFormat.format(new = Date()) + " "+
=A0=A0=A0 =A0=A0=A0 =A0=A0=A0 =A0=A0=A0 =A0=A0=A0 =A0=A0=A0 =A0=A0=A0 "= ;[numberOfProcessed=3D"+MyQueueConsumer.this.numberOfProcessedURL=A0= =A0=A0 +"]\n") ;
=A0=A0=A0 =A0=A0=A0 =A0=A0=A0 =A0=A0=A0 =A0= =A0=A0 MyQueueConsumer.this.timeCounter.flush();
=A0=A0=A0 =A0=A0=A0 =A0= =A0=A0 =A0=A0=A0
=A0=A0=A0 =A0=A0=A0 =A0=A0=A0 =A0=A0=A0 =A0=A0=A0 // S= leeps 5 minutes
=A0=A0=A0 =A0=A0=A0 =A0=A0=A0 =A0=A0=A0 =A0=A0=A0 Thread.sleep(300000);
= =A0=A0=A0 =A0=A0=A0 =A0=A0=A0 =A0=A0=A0 }
=A0=A0=A0 =A0=A0=A0 =A0=A0=A0 = } catch (Exception e) {
=A0=A0=A0 =A0=A0=A0 =A0=A0=A0 =A0=A0=A0 // TODO = Auto-generated catch block
=A0=A0=A0 =A0=A0=A0 =A0=A0=A0 =A0=A0=A0 e.pri= ntStackTrace();
=A0=A0=A0 =A0=A0=A0 =A0=A0=A0 }
=A0=A0=A0 =A0=A0=A0 }=
=A0=A0=A0 }
=A0=A0=A0
=A0=A0=A0
=A0=A0=A0 public MyQueueConsumer= (CuratorFramework framework, final String id) throws Exception {
=A0=A0= =A0 =A0=A0=A0 this.id =3D= id;
=A0=A0=A0 =A0=A0=A0 this.name =3D java.net.InetAddress.getLocalHost().getHostName();
=A0=A0=A0 =A0=A0=A0 this.timeCounter =3D new FileWriter(new File("MyQu= eueConsumer_"+ this.na= me + "_" +id + "_timeCounter.txt"));
=A0=A0=A0 = =A0=A0=A0
//=A0=A0=A0 =A0=A0=A0 this.timeCounterThread =3D new FileWrit= erThread();
//=A0=A0=A0 =A0=A0=A0 this.timeCounterThread.start();
=A0=A0=A0 =A0=A0=A0 this.queue =3D Utils.newDistributedQueue(framework,=20 Utils.CRAWL_QUEUE_PATH, Utils.CRAWL_QUEUE_LOCK_PATH, new=20 QueueConsumer<CrawlUrl>() {

=A0=A0=A0 =A0=A0=A0 =A0=A0=A0 @Ove= rride
=A0=A0=A0 =A0=A0=A0 =A0=A0=A0 public void stateChanged(CuratorFram= ework client, ConnectionState newState) {
=A0=A0=A0 =A0=A0=A0 =A0=A0=A0 = =A0=A0=A0 System.out.println(String.format("[%s] connection state chan= ged to %s", id, newState));
=A0=A0=A0 =A0=A0=A0 =A0=A0=A0 }

=A0=A0=A0 =A0=A0=A0 =A0=A0=A0 @Overr= ide
=A0=A0=A0 =A0=A0=A0 =A0=A0=A0 public void consumeMessage(CrawlUrl ur= l) throws Exception {
=A0=A0=A0 =A0=A0=A0 =A0=A0=A0 =A0=A0=A0 try {
= =A0=A0=A0 =A0=A0=A0 =A0=A0=A0 =A0=A0=A0 =A0=A0=A0 LOG.info(dateFormat.format(new=20 Date(System.currentTimeMillis())) + "["+id+ "-" +=20 MyQueueCons= umer.this.name+ "] processed " + url.url);
=A0=A0=A0 =A0= =A0=A0 =A0=A0=A0 =A0=A0=A0 =A0=A0=A0 MyQueueConsumer.this.numberOfProcessed= URL++;
=A0=A0=A0 =A0=A0=A0 =A0=A0=A0 =A0=A0=A0 } catch (Exception e) { =A0=A0=A0 =A0=A0=A0 =A0=A0=A0 =A0=A0=A0 =A0=A0=A0 LOG.error( "["+= id+ "-" + MyQueueConsumer.this.name+ "]" + e.getMessage() + &q= uot; for url " + url.url );
=A0=A0=A0 =A0=A0=A0 =A0=A0=A0 =A0=A0=A0 }
=A0=A0=A0 =A0=A0=A0 =A0=A0=A0 }

=A0=A0=A0 =A0=A0=A0 });
=A0=A0=A0= =A0=A0=A0 try {
=A0=A0=A0 =A0=A0=A0 =A0=A0=A0 this.queue.start();
= =A0=A0=A0 =A0=A0=A0 } catch (Exception e) {
=A0=A0=A0 =A0=A0=A0 =A0=A0= =A0 e.printStackTrace();
=A0=A0=A0 =A0=A0=A0 }

=A0=A0=A0 }
=A0= =A0=A0
=A0=A0=A0 public static void main(String[] args) {
=A0=A0=A0 =A0=A0=A0 try {
=A0=A0=A0 =A0=A0=A0 =A0=A0=A0 CrawlerPropertyF= ile props =3D new CrawlerPropertyFile(args[0]);
=A0=A0=A0
=A0=A0=A0 = =A0=A0=A0 =A0=A0=A0 final String connectString;
=A0=A0=A0 =A0=A0=A0 =A0= =A0=A0 System.out.println("DEBUG =3D " + Utils.DEBUG);
=A0=A0= =A0 =A0=A0=A0 =A0=A0=A0 if (props.useZkTestServer()) {
=A0=A0=A0 =A0=A0=A0 =A0=A0=A0 =A0=A0=A0 System.out.println("Will launc= h from zkTestServer");
=A0=A0=A0 =A0=A0=A0 =A0=A0=A0 =A0=A0=A0 Test= ingServer server =3D new TestingServer();
=A0=A0=A0 =A0=A0=A0 =A0=A0=A0 = =A0=A0=A0 connectString =3D server.getConnectString();
=A0=A0=A0 =A0=A0= =A0 =A0=A0=A0 } else {
=A0=A0=A0 =A0=A0=A0 =A0=A0=A0 =A0=A0=A0 connectString =3D props.getZkServer= ();
=A0=A0=A0 =A0=A0=A0 =A0=A0=A0 }
=A0=A0=A0
=A0=A0=A0 =A0=A0=A0= =A0=A0=A0 final CuratorFramework framework =3D Utils.newFramework(connectS= tring);
=A0=A0=A0 =A0=A0=A0 =A0=A0=A0 framework.start();
=A0=A0=A0 =A0=A0=A0 =A0=A0=A0 =A0=A0=A0 final MyQueueConsumer[] queueConsumers =3D = new MyQueueConsumer[props.getNumberOfWorkers()];
=A0=A0=A0
=A0=A0=A0 =A0=A0=A0 =A0=A0=A0 for (int i =3D 0; i < queueC= onsumers.length; i++) {
=A0=A0=A0 =A0=A0=A0 =A0=A0=A0 =A0=A0=A0 queueCon= sumers[i] =3D new MyQueueConsumer(framework, "id_"+i);
=A0=A0= =A0 =A0=A0=A0 =A0=A0=A0 }
=A0=A0=A0
=A0=A0=A0 =A0=A0=A0 =A0=A0=A0 Ru= ntime.getRuntime().addShutdownHook(new Thread() {
=A0=A0=A0 =A0=A0=A0 =A0=A0=A0 =A0=A0=A0 @Override
=A0=A0=A0 =A0=A0=A0 = =A0=A0=A0 =A0=A0=A0 public void run() {
=A0=A0=A0 =A0=A0=A0 =A0=A0=A0 = =A0=A0=A0 =A0=A0=A0 // close workers
=A0=A0=A0 =A0=A0=A0 =A0=A0=A0 =A0= =A0=A0 =A0=A0=A0 Throwable t =3D null;
=A0=A0=A0 =A0=A0=A0 =A0=A0=A0 =A0= =A0=A0 =A0=A0=A0 LOG.info("We close the workers");
=A0=A0=A0 = =A0=A0=A0 =A0=A0=A0 =A0=A0=A0 =A0=A0=A0 for (MyQueueConsumer queueConsumer = : queueConsumers) {
=A0=A0=A0 =A0=A0=A0 =A0=A0=A0 =A0=A0=A0 =A0=A0=A0 =A0=A0=A0 try {
=A0=A0= =A0 =A0=A0=A0 =A0=A0=A0 =A0=A0=A0 =A0=A0=A0 =A0=A0=A0 =A0=A0=A0 queueConsum= er.close();
=A0=A0=A0 =A0=A0=A0 =A0=A0=A0 =A0=A0=A0 =A0=A0=A0 =A0=A0=A0 = } catch (Throwable th) {
=A0=A0=A0 =A0=A0=A0 =A0=A0=A0 =A0=A0=A0 =A0=A0= =A0 =A0=A0=A0 =A0=A0=A0 if (t =3D=3D null) {
=A0=A0=A0 =A0=A0=A0 =A0=A0= =A0 =A0=A0=A0 =A0=A0=A0 =A0=A0=A0 =A0=A0=A0 =A0=A0=A0 t =3D th;
=A0=A0=A0 =A0=A0=A0 =A0=A0=A0 =A0=A0=A0 =A0=A0=A0 =A0=A0=A0 =A0=A0=A0 }
= =A0=A0=A0 =A0=A0=A0 =A0=A0=A0 =A0=A0=A0 =A0=A0=A0 =A0=A0=A0 }
=A0=A0=A0 = =A0=A0=A0 =A0=A0=A0 =A0=A0=A0 =A0=A0=A0 }
=A0=A0=A0 =A0=A0=A0 =A0=A0=A0 = =A0=A0=A0 =A0=A0=A0 // throw first exception that we encountered
=A0=A0= =A0 =A0=A0=A0 =A0=A0=A0 =A0=A0=A0 =A0=A0=A0 if (t !=3D null) {
=A0=A0=A0= =A0=A0=A0 =A0=A0=A0 =A0=A0=A0 =A0=A0=A0 =A0=A0=A0 throw new RuntimeExcepti= on("some workers failed to close", t);
=A0=A0=A0 =A0=A0=A0 =A0=A0=A0 =A0=A0=A0 =A0=A0=A0 }
=A0=A0=A0 =A0=A0=A0 = =A0=A0=A0 =A0=A0=A0 }
=A0=A0=A0 =A0=A0=A0 =A0=A0=A0 });
=A0=A0=A0 =A0= =A0=A0 =A0=A0=A0
=A0=A0=A0 =A0=A0=A0 }catch (Exception e ){
=A0=A0= =A0 =A0=A0=A0 =A0=A0=A0 e.printStackTrace();
=A0=A0=A0 =A0=A0=A0 }
= =A0=A0=A0 }

=A0=A0=A0 @Override
=A0=A0=A0 public void close() thr= ows IOException {
=A0=A0=A0 =A0=A0=A0 this.queue.close();
=A0=A0=A0 }
}



=
Main
-=3D-=3D-

package com.zk;

import org.apache= .curator.framework.CuratorFramework;
import org.apache.curator.test.Test= ingServer;



public class QueueTestMain {

=A0=A0=A0 /**
=A0=A0=A0 =A0*= @param args
=A0=A0=A0 =A0*/
=A0=A0=A0 public static void main(String= [] args) {
=A0=A0=A0 =A0=A0=A0 CrawlerPropertyFile props;
=A0=A0=A0 = =A0=A0=A0 try {
=A0=A0=A0 =A0=A0=A0 =A0=A0=A0 props =3D new CrawlerPrope= rtyFile(args[0]);

=A0=A0=A0 =A0=A0=A0 =A0=A0=A0 final String connectString;
=A0=A0=A0 = =A0=A0=A0 =A0=A0=A0 System.out.println("DEBUG =3D " + Utils.DEBUG= );
=A0=A0=A0 =A0=A0=A0 =A0=A0=A0 if (props.useZkTestServer()) {
=A0= =A0=A0 =A0=A0=A0 =A0=A0=A0 =A0=A0=A0 System.out.println("Will launch f= rom zkTestServer");
=A0=A0=A0 =A0=A0=A0 =A0=A0=A0 =A0=A0=A0 TestingServer server =3D new Testin= gServer();
=A0=A0=A0 =A0=A0=A0 =A0=A0=A0 =A0=A0=A0 connectString =3D ser= ver.getConnectString();
=A0=A0=A0 =A0=A0=A0 =A0=A0=A0 } else {
=A0=A0= =A0 =A0=A0=A0 =A0=A0=A0 =A0=A0=A0 connectString =3D props.getZkServer();=A0=A0=A0 =A0=A0=A0 =A0=A0=A0 }

=A0=A0=A0 =A0=A0=A0 =A0=A0=A0 final CuratorFramework framework =3D Util= s.newFramework(connectString);
=A0=A0=A0 =A0=A0=A0 =A0=A0=A0 framework.s= tart();
=A0=A0=A0 =A0=A0=A0 =A0=A0=A0

=A0=A0=A0 =A0=A0=A0 =A0=A0= =A0 if (args[1] !=3D null && args[1].equalsIgnoreCase("true&qu= ot;)) {
=A0=A0=A0 =A0=A0=A0 =A0=A0=A0 =A0=A0=A0 @SuppressWarnings("unused"= ;)
=A0=A0=A0 =A0=A0=A0 =A0=A0=A0 =A0=A0=A0 QueueProducer producer =3D ne= w QueueProducer(framework);
=A0=A0=A0 =A0=A0=A0 =A0=A0=A0 } else {
= =A0=A0=A0 =A0=A0=A0 =A0=A0=A0
=A0=A0=A0 =A0=A0=A0 =A0=A0=A0 =A0=A0=A0 f= inal MyQueueConsumer[] queueConsumers =3D new MyQueueConsumer[props.getNumb= erOfWorkers()];
=A0=A0=A0 =A0=A0=A0 =A0=A0=A0 =A0=A0=A0
=A0=A0=A0 =A0=A0=A0 =A0=A0=A0 = =A0=A0=A0 for (int i =3D 0; i < queueConsumers.length; i++) {
=A0=A0= =A0 =A0=A0=A0 =A0=A0=A0 =A0=A0=A0 =A0=A0=A0 queueConsumers[i] =3D new MyQue= ueConsumer(framework, "id_"+i);
=A0=A0=A0 =A0=A0=A0 =A0=A0=A0 = =A0=A0=A0 }
=A0=A0=A0 =A0=A0=A0
=A0=A0=A0 =A0=A0=A0 =A0=A0=A0 =A0=A0= =A0 Runtime.getRuntime().addShutdownHook(new Thread() {
=A0=A0=A0 =A0=A0=A0 =A0=A0=A0 =A0=A0=A0 =A0=A0=A0 @Override
=A0=A0=A0 = =A0=A0=A0 =A0=A0=A0 =A0=A0=A0 =A0=A0=A0 public void run() {
=A0=A0=A0 = =A0=A0=A0 =A0=A0=A0 =A0=A0=A0 =A0=A0=A0 =A0=A0=A0 // close workers
=A0= =A0=A0 =A0=A0=A0 =A0=A0=A0 =A0=A0=A0 =A0=A0=A0 =A0=A0=A0 Throwable t =3D nu= ll;
=A0=A0=A0 =A0=A0=A0 =A0=A0=A0 =A0=A0=A0 =A0=A0=A0 =A0=A0=A0 for (MyQ= ueueConsumer queueConsumer : queueConsumers) {
=A0=A0=A0 =A0=A0=A0 =A0=A0=A0 =A0=A0=A0 =A0=A0=A0 =A0=A0=A0 =A0=A0=A0 try {=
=A0=A0=A0 =A0=A0=A0 =A0=A0=A0 =A0=A0=A0 =A0=A0=A0 =A0=A0=A0 =A0=A0=A0 = =A0=A0=A0 queueConsumer.close();
=A0=A0=A0 =A0=A0=A0 =A0=A0=A0 =A0=A0=A0= =A0=A0=A0 =A0=A0=A0 =A0=A0=A0 } catch (Throwable th) {
=A0=A0=A0 =A0=A0= =A0 =A0=A0=A0 =A0=A0=A0 =A0=A0=A0 =A0=A0=A0 =A0=A0=A0 =A0=A0=A0 if (t =3D= =3D null) {
=A0=A0=A0 =A0=A0=A0 =A0=A0=A0 =A0=A0=A0 =A0=A0=A0 =A0=A0=A0 = =A0=A0=A0 =A0=A0=A0 =A0=A0=A0 t =3D th;
=A0=A0=A0 =A0=A0=A0 =A0=A0=A0 =A0=A0=A0 =A0=A0=A0 =A0=A0=A0 =A0=A0=A0 =A0= =A0=A0 }
=A0=A0=A0 =A0=A0=A0 =A0=A0=A0 =A0=A0=A0 =A0=A0=A0 =A0=A0=A0 =A0= =A0=A0 }
=A0=A0=A0 =A0=A0=A0 =A0=A0=A0 =A0=A0=A0 =A0=A0=A0 =A0=A0=A0 }=A0=A0=A0 =A0=A0=A0 =A0=A0=A0 =A0=A0=A0 =A0=A0=A0 =A0=A0=A0 // throw firs= t exception that we encountered
=A0=A0=A0 =A0=A0=A0 =A0=A0=A0 =A0=A0=A0 = =A0=A0=A0 =A0=A0=A0 if (t !=3D null) {
=A0=A0=A0 =A0=A0=A0 =A0=A0=A0 =A0= =A0=A0 =A0=A0=A0 =A0=A0=A0 =A0=A0=A0 throw new RuntimeException("some = workers failed to close", t);
=A0=A0=A0 =A0=A0=A0 =A0=A0=A0 =A0=A0=A0 =A0=A0=A0 =A0=A0=A0 }
=A0=A0=A0 = =A0=A0=A0 =A0=A0=A0 =A0=A0=A0 =A0=A0=A0 }
=A0=A0=A0 =A0=A0=A0 =A0=A0=A0 = =A0=A0=A0 });
=A0=A0=A0
=A0=A0=A0 =A0=A0=A0 =A0=A0=A0 =A0=A0=A0
= =A0=A0=A0 =A0=A0=A0 =A0=A0=A0 }
=A0=A0=A0 =A0=A0=A0 }catch (Exception e = ){
=A0=A0=A0 =A0=A0=A0 =A0=A0=A0 e.printStackTrace();
=A0=A0=A0 =A0= =A0=A0 }

=A0=A0=A0 }

}




Example of output:





On Sun, Nov 17, = 2013 at 10:14 PM, Jordan Zimmerman <jordan@jordanzimmerman.com> wrote:
Can you produce a test th= at shows this? Anything else interesting in the log? Of course, there could= be a bug.

-Jordan

On Nov 14, 2013, at 1:18 PM, Sznajder ForMailingList <
bs4mailinglist@gmail.com>= ; wrote:

> Hi
>
> I made a short test as following:
>
> - I have a chorum of 3 nodes for Zookeeper.
> - I wrote a class using Curator QueueProducer who produces all the=20 time (when the queue is 10% full, it creates new items) , items (random=20 integer)
> - I wrote a simple class using Curator Queue Consumer which simply pri= nts to Log "consumed item i".
>
> I tested some different combinations :
> - running the consumers on one, two or three nodes.
> - running one or more consumers in parallel on a given node.
>
>
> But, and here is my question: I see some very strange behavior when I have several consummers in parallel on a node. For example, running 5 consumers per node on 3 nodes, I see a throughput **very** slow. When=20 looking at my Log, I see that most of the consumers are most of the time on an idle state....
>
> Do I mistake somewhere?
> =A0I was expecting to enhance the throughput by augmenting the number = of consumers, I am surprised to see the opposite....
>
> Thanks a lot
>
> Benjamin




<ir-hadoop4_log.txt>
<= /div>
--001a11c1ba0498a30304eb902cbf--