uima-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Dietmar Gräbner <d.graeb...@gmail.com>
Subject Re: UimaAS blocks when accessing a queue with multiple clients concurrently
Date Fri, 10 Dec 2010 08:35:09 GMT
Hi,

I tested your example and worked for both configurations
(MeetingDetectorTae with and without the <analysisEngine> part. The
main difference between your client and mine is the separation of the
initialize and the submit process in your example.
Here is my code (Main class and Worker Thread):

The main class:


import org.apache.log4j.Logger;
import org.apache.log4j.xml.DOMConfigurator;

/**
 *
 * A multithreaded testClient calling a worker.
 *
 */
public class MultithreadedTestClient {

    //a server timeout not used in the current example
    public static final int LINGREP_SERVER_CONNECTION_TIMEOUT = 600000;
    public static final int TEST_NUMBEROFTHREADS = 100; // 10 threads


    public static Logger theLog =
Logger.getLogger(MultithreadedTestClient.class);
    /**
     * Main method for the test
     *
     * @param args no arguments are parsed
     * @throws Exception some problem
     */
    public static void main(java.lang.String args[]) throws Exception {

        // specify logfile settings
        if (System.getProperty("log4j.configuration")==null){
            DOMConfigurator.configure("resources/config/log4j.xml");
        }

        for (int i = 0; i < TEST_NUMBEROFTHREADS; i++) {
            Runnable r = new MinimalWorkerThread(i);
            new Thread(r).start();
        }
    }

}

And the Thread Worker:

import java.util.HashMap;
import java.util.Map;

import org.apache.uima.aae.client.UimaAsynchronousEngine;
import org.apache.uima.adapter.jms.client.BaseUIMAAsynchronousEngine_impl;
import org.apache.uima.cas.CAS;

/**
 * a runnable initializing and calling the Webservice
 */
public class MinimalWorkerThread implements Runnable {

    private int mId = 0;
    /**
     * Start time of the processing - used to compute elapsed time.
     */
    private UimaAsynchronousEngine uimaEEEngine = null;
    Map<String, Object> appCtx = new HashMap<String, Object>();

    /**
     * Constructor for the class. Parses command line arguments and
sets the values of fields in this
     * instance. If command line is invalid prints a message and calls
System.exit().
     *
     * @param args
     *          command line arguments into the program - see class description
     */
    public MinimalWorkerThread(int id) throws Exception {
        mId = id;
        //Initzialize the AppContext
        uimaEEEngine = new BaseUIMAAsynchronousEngine_impl();
        // Add Broker URI
        appCtx.put(UimaAsynchronousEngine.ServerUri,
"tcp://143.205.174.93:61616");
        // Add Endpoint
        appCtx.put(UimaAsynchronousEngine.Endpoint, "MeetingDetectorTaeQueue");
        appCtx.put(UimaAsynchronousEngine.CasPoolSize, 1);
    }

    public void run() {

        try {
            System.out.println("running " + mId);
            //initialize the client
            uimaEEEngine.initialize(appCtx);
            String text = "Id " + mId +"This is a nice test sentence.
And a second. Including a third.";
            // send an empty CAS
            CAS cas = uimaEEEngine.getCAS();
            //cas.setDocumentLanguage("en");
            cas.setDocumentText(text);
            uimaEEEngine.sendAndReceiveCAS(cas);
            uimaEEEngine.collectionProcessingComplete();
            System.out.println("Thread id " + mId + " returned " +
cas.getDocumentText().substring(0, 5));
            cas.reset();
            uimaEEEngine.stop();
        } catch (Exception e) {
            System.err.println("Exception during Processing!");
            e.printStackTrace();
        }
    }
}









On Thu, Dec 9, 2010 at 5:21 PM, Jaroslaw Cwiklik <uimaee@gmail.com> wrote:
> For some reason attachment dont seem to work. Here is my code:
>
> /*
>  * Licensed to the Apache Software Foundation (ASF) under one
>  * or more contributor license agreements.  See the NOTICE file
>  * distributed with this work for additional information
>  * regarding copyright ownership.  The ASF licenses this file
>  * to you under the Apache License, Version 2.0 (the
>  * "License"); you may not use this file except in compliance
>  * with the License.  You may obtain a copy of the License at
>  *
>  *   http://www.apache.org/licenses/LICENSE-2.0
>  *
>  * Unless required by applicable law or agreed to in writing,
>  * software distributed under the License is distributed on an
>  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
>  * KIND, either express or implied.  See the License for the
>  * specific language governing permissions and limitations
>  * under the License.
>  */
>
>
> import java.util.HashMap;
> import java.util.Map;
> import java.util.concurrent.ArrayBlockingQueue;
> import java.util.concurrent.CountDownLatch;
> import java.util.concurrent.ThreadPoolExecutor;
> import java.util.concurrent.TimeUnit;
>
> import org.apache.uima.aae.client.UimaAsynchronousEngine;
> import org.apache.uima.adapter.jms.client.BaseUIMAAsynchronousEngine_impl;
> import org.apache.uima.cas.CAS;
>
> /**
>  * Example client application that can instantiate multiple UIMA AS clients
> each running in
>  * a separate thread.
>  * <p>
>  * Arguments: brokerUrl endpoint <howManyCASesToSend> <scaleup>
>  * <p>
>  * The application creates as many UIMA AS clients and threads as specified
> in the "scaleup"
>  * argument. Each instance runs in its own thread and has is its own temp
> reply queue. It
>  * uses a synchronous <i>sendAndReceive()</i> to send CASes to a remote
> service. For this
>  * a CAS Pool containing a single CAS is sufficient.
>  * <p>
>  * Each client sends as many CASes to a remote service as specified in the
> "howManyCASesToSend"
>  * argument.
>  * <p>
>  * The application initializes a CountDownLatch to the number of
> clients/threads which is than
>  * used to await completion. When a worker thread completes its run, it
> sends a CPC and counts down the
>  * latch reducing the number of busy threads. When all threads finish, the
> application is notified
>  * and can proceed to cleanup and shutdown.
>  *
>  */
> public class MultithreadedClientApp {
> public CountDownLatch latch = null;
> public void initializeAndRun(String[] args) throws Exception {
> try {
> int howManyWorkers = Integer.parseInt(args[3]);  // how many threads
> latch = new CountDownLatch(howManyWorkers);  // each worker counts down when
> done
> // Create Worker threads
> ClientWorker[] workers = new ClientWorker[howManyWorkers];
> final ArrayBlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(
> howManyWorkers);
> // Thread Pool Executor to manages threads
> ThreadPoolExecutor threadPool = new ThreadPoolExecutor(howManyWorkers,
> howManyWorkers,
>                Integer.MAX_VALUE, TimeUnit.SECONDS, queue);
> // Start all threads
> threadPool.prestartAllCoreThreads();
> for( int i=0; i < howManyWorkers; i++ ) {
> workers[i] = new ClientWorker();
> // 0 - brokerURL, 1 - queue name, 2 - how many CASes to send
> workers[i].initialize(args[0],args[1],Integer.parseInt(args[2]));
> }
>
> for( int i=0; i < howManyWorkers; i++ ) {
> threadPool.submit(workers[i]);  // start the workers
> }
> // Each worker counts down the latch after it is done sending CASes
> latch.await();
>  // All worker threads completed, now stop the clients
> for( int i=0; i < howManyWorkers; i++ ) {
> workers[i].stop();  // stop UIMA AS clients
> }
>
> threadPool.shutdown();  // cleanup thread pool
>  System.out.println("All UIMA AS Clients Finished Processing");
> } catch( Exception e ) {
> e.printStackTrace();
> }
>  }
> public static void main(String[] args) {
> MultithreadedClientApp client = new MultithreadedClientApp();
> try {
> if ( args.length != 4 ) {
> System.out.println("Usage: ");
> }
> client.initializeAndRun(args);
> } catch( Exception e ) {
> e.printStackTrace();
> }
> }
> public class ClientWorker implements Runnable {
> private BaseUIMAAsynchronousEngine_impl uimaASClient = null;
> private int howManyCASes = 0;
> public void initialize(String brokerUrl, String endpoint, int howManyCASes )
> throws Exception {
>  uimaASClient = new BaseUIMAAsynchronousEngine_impl();
> Map<String, Object> appCtx = new HashMap<String, Object>();
>    // set server URI and Endpoint
>    // Add Broker URI
>    appCtx.put(UimaAsynchronousEngine.ServerUri, brokerUrl);
>    // Add Queue Name
>    appCtx.put(UimaAsynchronousEngine.Endpoint, endpoint);
>    // Add the Cas Pool Size and initial FS heap size
>    appCtx.put(UimaAsynchronousEngine.CasPoolSize, 1);
>
>    // initialize
>    uimaASClient.initialize(appCtx);
>    this.howManyCASes = howManyCASes;
> }
> public void stop() {
> uimaASClient.stop();
> }
> public void run() {
> try {
>    int sentSoFar = 0;
>             CAS cas = uimaASClient.getCAS();
>             int count=1;
>     while( sentSoFar < howManyCASes ) {
>
>          cas.setDocumentText("Some Text");
>
>          uimaASClient.sendAndReceiveCAS(cas);
>          System.out.println("Thread:"+Thread.currentThread().getId()+":::
> Success Calling sendAndReceiveCAS(). Sent "+count++ + " CASes so far");
>          cas.reset();
>          sentSoFar++;
>     }
>     uimaASClient.collectionProcessingComplete();
>     System.out.println("Thread::"+Thread.currentThread().getId()+" Sent
> CPC. Thread Done");
>     latch.countDown();
> } catch( Exception e) {
> e.printStackTrace();
> }
> }
> }
> }
>
>
>
>
> On Thu, Dec 9, 2010 at 11:04 AM, Jaroslaw Cwiklik <uimaee@gmail.com> wrote:
>
>> Dietmar, I tried my example application with an Aggregate Service and see
>> no problem. Your previous email had no source attached.
>> Attached please find an example application code I use in my testing. To
>> run it"
>>
>> java -cp <classpath> MultithreadedClientApp
>> <brokerURL><serviceQueueName><howManyCASesEachThreadShouldSend><howManyThreads>
>>
>> The code adds a short text to each CAS before each call to
>> sendAndReceive(). There are no app listeners attached to UIMA AS client.
>>
>> Jerry
>>
>> 2010/12/9 Dietmar Gräbner <d.graebner@gmail.com>
>>
>> Hi Eddie,
>>>
>>> wouldn't the client requests be serialized in the szenario you propose?
>>>
>>> Dietmar
>>>
>>>
>>>
>>> On Tue, Dec 7, 2010 at 10:54 PM, Eddie Epstein <eaepstein@gmail.com>
>>> wrote:
>>> > 2010/12/7 Dietmar Gräbner <d.graebner@gmail.com>:
>>> >> I wrote a test client creating multiple threads. Each thread
>>> >> instantiates a BaseUIMAAsynchronousEngine_impl and invokes a uima
>>> >> aggregate with the sendAndReceiveCAS() call. When running the program
>>> >> with e.g. 100 Threads the client gets stuck after processing X calls.
>>> >
>>> > FWIW, a similar multithreaded client scenario that has been used with
>>> > no problems is to instantiate a single BaseUIMAAsynchronousEngine_impl
>>> > with big enough casPool and have each thread call sendAndReceiveCAS()
>>> > using the common API object.
>>> >
>>> > Eddie
>>> >
>>>
>>
>>
>

Mime
View raw message