uima-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jaroslaw Cwiklik <uim...@gmail.com>
Subject Re: UimaAS blocks when accessing a queue with multiple clients concurrently
Date Fri, 10 Dec 2010 16:09:15 GMT
Dietmar, I ran your code and so far I dont see a hang. I ran this code
multiple times on my 2-Core Thinkpad. Perhaps if you run this
on a machine with more CPUs the timing changes and causes a race condition
that results in a hang. I just dont see it.

I *am* getting a hang if I dont provide enough memory to the process.  I
tried to run your code with 200 threads
and 600M memory and I got OOM and eventual hang. There were a lot of
exceptions though which you say you dont see
when you run. I've noticed that your code uses log4j, perhaps the exceptions
are in the log?

JC

2010/12/10 Dietmar Gräbner <d.graebner@gmail.com>

> Hi,
>
> I tested your example and worked for both configurations
> (MeetingDetectorTae with and without the <analysisEngine> part. The
> main difference between your client and mine is the separation of the
> initialize and the submit process in your example.
> Here is my code (Main class and Worker Thread):
>
> The main class:
>
>
> import org.apache.log4j.Logger;
> import org.apache.log4j.xml.DOMConfigurator;
>
> /**
>  *
>  * A multithreaded testClient calling a worker.
>  *
>  */
> public class MultithreadedTestClient {
>
>    //a server timeout not used in the current example
>    public static final int LINGREP_SERVER_CONNECTION_TIMEOUT = 600000;
>    public static final int TEST_NUMBEROFTHREADS = 100; // 10 threads
>
>
>    public static Logger theLog =
> Logger.getLogger(MultithreadedTestClient.class);
>    /**
>     * Main method for the test
>     *
>     * @param args no arguments are parsed
>     * @throws Exception some problem
>     */
>    public static void main(java.lang.String args[]) throws Exception {
>
>        // specify logfile settings
>        if (System.getProperty("log4j.configuration")==null){
>            DOMConfigurator.configure("resources/config/log4j.xml");
>        }
>
>        for (int i = 0; i < TEST_NUMBEROFTHREADS; i++) {
>            Runnable r = new MinimalWorkerThread(i);
>            new Thread(r).start();
>        }
>    }
>
> }
>
> And the Thread Worker:
>
> import java.util.HashMap;
> import java.util.Map;
>
> import org.apache.uima.aae.client.UimaAsynchronousEngine;
> import org.apache.uima.adapter.jms.client.BaseUIMAAsynchronousEngine_impl;
> import org.apache.uima.cas.CAS;
>
> /**
>  * a runnable initializing and calling the Webservice
>  */
> public class MinimalWorkerThread implements Runnable {
>
>    private int mId = 0;
>    /**
>     * Start time of the processing - used to compute elapsed time.
>     */
>    private UimaAsynchronousEngine uimaEEEngine = null;
>     Map<String, Object> appCtx = new HashMap<String, Object>();
>
>     /**
>     * Constructor for the class. Parses command line arguments and
> sets the values of fields in this
>     * instance. If command line is invalid prints a message and calls
> System.exit().
>     *
>     * @param args
>     *          command line arguments into the program - see class
> description
>     */
>    public MinimalWorkerThread(int id) throws Exception {
>        mId = id;
>        //Initzialize the AppContext
>        uimaEEEngine = new BaseUIMAAsynchronousEngine_impl();
>         // Add Broker URI
>        appCtx.put(UimaAsynchronousEngine.ServerUri,
> "tcp://143.205.174.93:61616");
>        // Add Endpoint
>        appCtx.put(UimaAsynchronousEngine.Endpoint,
> "MeetingDetectorTaeQueue");
>         appCtx.put(UimaAsynchronousEngine.CasPoolSize, 1);
>    }
>
>     public void run() {
>
>        try {
>            System.out.println("running " + mId);
>            //initialize the client
>            uimaEEEngine.initialize(appCtx);
>            String text = "Id " + mId +"This is a nice test sentence.
> And a second. Including a third.";
>            // send an empty CAS
>            CAS cas = uimaEEEngine.getCAS();
>            //cas.setDocumentLanguage("en");
>            cas.setDocumentText(text);
>            uimaEEEngine.sendAndReceiveCAS(cas);
>            uimaEEEngine.collectionProcessingComplete();
>            System.out.println("Thread id " + mId + " returned " +
> cas.getDocumentText().substring(0, 5));
>            cas.reset();
>            uimaEEEngine.stop();
>        } catch (Exception e) {
>            System.err.println("Exception during Processing!");
>            e.printStackTrace();
>         }
>    }
> }
>
>
>
>
>
>
>
>
>
> On Thu, Dec 9, 2010 at 5:21 PM, Jaroslaw Cwiklik <uimaee@gmail.com> wrote:
> > For some reason attachment dont seem to work. Here is my code:
> >
> > /*
> >  * Licensed to the Apache Software Foundation (ASF) under one
> >  * or more contributor license agreements.  See the NOTICE file
> >  * distributed with this work for additional information
> >  * regarding copyright ownership.  The ASF licenses this file
> >  * to you under the Apache License, Version 2.0 (the
> >  * "License"); you may not use this file except in compliance
> >  * with the License.  You may obtain a copy of the License at
> >  *
> >  *   http://www.apache.org/licenses/LICENSE-2.0
> >  *
> >  * Unless required by applicable law or agreed to in writing,
> >  * software distributed under the License is distributed on an
> >  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
> >  * KIND, either express or implied.  See the License for the
> >  * specific language governing permissions and limitations
> >  * under the License.
> >  */
> >
> >
> > import java.util.HashMap;
> > import java.util.Map;
> > import java.util.concurrent.ArrayBlockingQueue;
> > import java.util.concurrent.CountDownLatch;
> > import java.util.concurrent.ThreadPoolExecutor;
> > import java.util.concurrent.TimeUnit;
> >
> > import org.apache.uima.aae.client.UimaAsynchronousEngine;
> > import
> org.apache.uima.adapter.jms.client.BaseUIMAAsynchronousEngine_impl;
> > import org.apache.uima.cas.CAS;
> >
> > /**
> >  * Example client application that can instantiate multiple UIMA AS
> clients
> > each running in
> >  * a separate thread.
> >  * <p>
> >  * Arguments: brokerUrl endpoint <howManyCASesToSend> <scaleup>
> >  * <p>
> >  * The application creates as many UIMA AS clients and threads as
> specified
> > in the "scaleup"
> >  * argument. Each instance runs in its own thread and has is its own temp
> > reply queue. It
> >  * uses a synchronous <i>sendAndReceive()</i> to send CASes to a remote
> > service. For this
> >  * a CAS Pool containing a single CAS is sufficient.
> >  * <p>
> >  * Each client sends as many CASes to a remote service as specified in
> the
> > "howManyCASesToSend"
> >  * argument.
> >  * <p>
> >  * The application initializes a CountDownLatch to the number of
> > clients/threads which is than
> >  * used to await completion. When a worker thread completes its run, it
> > sends a CPC and counts down the
> >  * latch reducing the number of busy threads. When all threads finish,
> the
> > application is notified
> >  * and can proceed to cleanup and shutdown.
> >  *
> >  */
> > public class MultithreadedClientApp {
> > public CountDownLatch latch = null;
> > public void initializeAndRun(String[] args) throws Exception {
> > try {
> > int howManyWorkers = Integer.parseInt(args[3]);  // how many threads
> > latch = new CountDownLatch(howManyWorkers);  // each worker counts down
> when
> > done
> > // Create Worker threads
> > ClientWorker[] workers = new ClientWorker[howManyWorkers];
> > final ArrayBlockingQueue<Runnable> queue = new
> ArrayBlockingQueue<Runnable>(
> > howManyWorkers);
> > // Thread Pool Executor to manages threads
> > ThreadPoolExecutor threadPool = new ThreadPoolExecutor(howManyWorkers,
> > howManyWorkers,
> >                Integer.MAX_VALUE, TimeUnit.SECONDS, queue);
> > // Start all threads
> > threadPool.prestartAllCoreThreads();
> > for( int i=0; i < howManyWorkers; i++ ) {
> > workers[i] = new ClientWorker();
> > // 0 - brokerURL, 1 - queue name, 2 - how many CASes to send
> > workers[i].initialize(args[0],args[1],Integer.parseInt(args[2]));
> > }
> >
> > for( int i=0; i < howManyWorkers; i++ ) {
> > threadPool.submit(workers[i]);  // start the workers
> > }
> > // Each worker counts down the latch after it is done sending CASes
> > latch.await();
> >  // All worker threads completed, now stop the clients
> > for( int i=0; i < howManyWorkers; i++ ) {
> > workers[i].stop();  // stop UIMA AS clients
> > }
> >
> > threadPool.shutdown();  // cleanup thread pool
> >  System.out.println("All UIMA AS Clients Finished Processing");
> > } catch( Exception e ) {
> > e.printStackTrace();
> > }
> >  }
> > public static void main(String[] args) {
> > MultithreadedClientApp client = new MultithreadedClientApp();
> > try {
> > if ( args.length != 4 ) {
> > System.out.println("Usage: ");
> > }
> > client.initializeAndRun(args);
> > } catch( Exception e ) {
> > e.printStackTrace();
> > }
> > }
> > public class ClientWorker implements Runnable {
> > private BaseUIMAAsynchronousEngine_impl uimaASClient = null;
> > private int howManyCASes = 0;
> > public void initialize(String brokerUrl, String endpoint, int
> howManyCASes )
> > throws Exception {
> >  uimaASClient = new BaseUIMAAsynchronousEngine_impl();
> > Map<String, Object> appCtx = new HashMap<String, Object>();
> >    // set server URI and Endpoint
> >    // Add Broker URI
> >    appCtx.put(UimaAsynchronousEngine.ServerUri, brokerUrl);
> >    // Add Queue Name
> >    appCtx.put(UimaAsynchronousEngine.Endpoint, endpoint);
> >    // Add the Cas Pool Size and initial FS heap size
> >    appCtx.put(UimaAsynchronousEngine.CasPoolSize, 1);
> >
> >    // initialize
> >    uimaASClient.initialize(appCtx);
> >    this.howManyCASes = howManyCASes;
> > }
> > public void stop() {
> > uimaASClient.stop();
> > }
> > public void run() {
> > try {
> >    int sentSoFar = 0;
> >             CAS cas = uimaASClient.getCAS();
> >             int count=1;
> >     while( sentSoFar < howManyCASes ) {
> >
> >          cas.setDocumentText("Some Text");
> >
> >          uimaASClient.sendAndReceiveCAS(cas);
> >          System.out.println("Thread:"+Thread.currentThread().getId()+":::
> > Success Calling sendAndReceiveCAS(). Sent "+count++ + " CASes so far");
> >          cas.reset();
> >          sentSoFar++;
> >     }
> >     uimaASClient.collectionProcessingComplete();
> >     System.out.println("Thread::"+Thread.currentThread().getId()+" Sent
> > CPC. Thread Done");
> >     latch.countDown();
> > } catch( Exception e) {
> > e.printStackTrace();
> > }
> > }
> > }
> > }
> >
> >
> >
> >
> > On Thu, Dec 9, 2010 at 11:04 AM, Jaroslaw Cwiklik <uimaee@gmail.com>
> wrote:
> >
> >> Dietmar, I tried my example application with an Aggregate Service and
> see
> >> no problem. Your previous email had no source attached.
> >> Attached please find an example application code I use in my testing. To
> >> run it"
> >>
> >> java -cp <classpath> MultithreadedClientApp
> >>
> <brokerURL><serviceQueueName><howManyCASesEachThreadShouldSend><howManyThreads>
> >>
> >> The code adds a short text to each CAS before each call to
> >> sendAndReceive(). There are no app listeners attached to UIMA AS client.
> >>
> >> Jerry
> >>
> >> 2010/12/9 Dietmar Gräbner <d.graebner@gmail.com>
> >>
> >> Hi Eddie,
> >>>
> >>> wouldn't the client requests be serialized in the szenario you propose?
> >>>
> >>> Dietmar
> >>>
> >>>
> >>>
> >>> On Tue, Dec 7, 2010 at 10:54 PM, Eddie Epstein <eaepstein@gmail.com>
> >>> wrote:
> >>> > 2010/12/7 Dietmar Gräbner <d.graebner@gmail.com>:
> >>> >> I wrote a test client creating multiple threads. Each thread
> >>> >> instantiates a BaseUIMAAsynchronousEngine_impl and invokes a uima
> >>> >> aggregate with the sendAndReceiveCAS() call. When running the
> program
> >>> >> with e.g. 100 Threads the client gets stuck after processing X
> calls.
> >>> >
> >>> > FWIW, a similar multithreaded client scenario that has been used with
> >>> > no problems is to instantiate a single
> BaseUIMAAsynchronousEngine_impl
> >>> > with big enough casPool and have each thread call sendAndReceiveCAS()
> >>> > using the common API object.
> >>> >
> >>> > Eddie
> >>> >
> >>>
> >>
> >>
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message