uima-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Dietmar Gräbner <d.graeb...@gmail.com>
Subject Re: UimaAS blocks when accessing a queue with multiple clients concurrently
Date Mon, 13 Dec 2010 07:20:40 GMT
Hi,

I checked the logs again - I got no Exceptions. Just tried it with 10
Threads with the same result. Did you use the modified deployment
descriptor in your tests?
The hang only occurs with the <analysisEngine>- Tag defined in the
deployment descriptor.

Below is the sample descriptor of the Meeting Detector Aggregate:

thx

Dietmar


<analysisEngineDeploymentDescription
  xmlns="http://uima.apache.org/resourceSpecifier">

  <name>Meeting Detector TAE</name>
  <description>Deploys Meeting Detector Aggregate AE with all its
delegates in the same JVM.</description>

  <deployment protocol="jms" provider="activemq">
    <service>
      <inputQueue endpoint="MeetingDetectorTaeQueue"
brokerURL="tcp://url:61616"/>
      <topDescriptor>
       <import location="../../descriptors/tutorial/ex4/MeetingDetectorTAE.xml"/>
      </topDescriptor>
      <analysisEngine key="MeetingDetectorTae">
              <delegates>
                <analysisEngine key="RoomNumber">
                </analysisEngine>
                <analysisEngine key ="DateTime">
                </analysisEngine>
                <analysisEngine key ="Meeting">
                </analysisEngine>
              </delegates>
      </analysisEngine>
    </service>
  </deployment>

</analysisEngineDeploymentDescription>

On Fri, Dec 10, 2010 at 5:09 PM, Jaroslaw Cwiklik <uimaee@gmail.com> wrote:
> Dietmar, I ran your code and so far I dont see a hang. I ran this code
> multiple times on my 2-Core Thinkpad. Perhaps if you run this
> on a machine with more CPUs the timing changes and causes a race condition
> that results in a hang. I just dont see it.
>
> I *am* getting a hang if I dont provide enough memory to the process.  I
> tried to run your code with 200 threads
> and 600M memory and I got OOM and eventual hang. There were a lot of
> exceptions though which you say you dont see
> when you run. I've noticed that your code uses log4j, perhaps the exceptions
> are in the log?
>
> JC
>
> 2010/12/10 Dietmar Gräbner <d.graebner@gmail.com>
>
>> Hi,
>>
>> I tested your example and worked for both configurations
>> (MeetingDetectorTae with and without the <analysisEngine> part. The
>> main difference between your client and mine is the separation of the
>> initialize and the submit process in your example.
>> Here is my code (Main class and Worker Thread):
>>
>> The main class:
>>
>>
>> import org.apache.log4j.Logger;
>> import org.apache.log4j.xml.DOMConfigurator;
>>
>> /**
>>  *
>>  * A multithreaded testClient calling a worker.
>>  *
>>  */
>> public class MultithreadedTestClient {
>>
>>    //a server timeout not used in the current example
>>    public static final int LINGREP_SERVER_CONNECTION_TIMEOUT = 600000;
>>    public static final int TEST_NUMBEROFTHREADS = 100; // 10 threads
>>
>>
>>    public static Logger theLog =
>> Logger.getLogger(MultithreadedTestClient.class);
>>    /**
>>     * Main method for the test
>>     *
>>     * @param args no arguments are parsed
>>     * @throws Exception some problem
>>     */
>>    public static void main(java.lang.String args[]) throws Exception {
>>
>>        // specify logfile settings
>>        if (System.getProperty("log4j.configuration")==null){
>>            DOMConfigurator.configure("resources/config/log4j.xml");
>>        }
>>
>>        for (int i = 0; i < TEST_NUMBEROFTHREADS; i++) {
>>            Runnable r = new MinimalWorkerThread(i);
>>            new Thread(r).start();
>>        }
>>    }
>>
>> }
>>
>> And the Thread Worker:
>>
>> import java.util.HashMap;
>> import java.util.Map;
>>
>> import org.apache.uima.aae.client.UimaAsynchronousEngine;
>> import org.apache.uima.adapter.jms.client.BaseUIMAAsynchronousEngine_impl;
>> import org.apache.uima.cas.CAS;
>>
>> /**
>>  * a runnable initializing and calling the Webservice
>>  */
>> public class MinimalWorkerThread implements Runnable {
>>
>>    private int mId = 0;
>>    /**
>>     * Start time of the processing - used to compute elapsed time.
>>     */
>>    private UimaAsynchronousEngine uimaEEEngine = null;
>>     Map<String, Object> appCtx = new HashMap<String, Object>();
>>
>>     /**
>>     * Constructor for the class. Parses command line arguments and
>> sets the values of fields in this
>>     * instance. If command line is invalid prints a message and calls
>> System.exit().
>>     *
>>     * @param args
>>     *          command line arguments into the program - see class
>> description
>>     */
>>    public MinimalWorkerThread(int id) throws Exception {
>>        mId = id;
>>        //Initzialize the AppContext
>>        uimaEEEngine = new BaseUIMAAsynchronousEngine_impl();
>>         // Add Broker URI
>>        appCtx.put(UimaAsynchronousEngine.ServerUri,
>> "tcp://143.205.174.93:61616");
>>        // Add Endpoint
>>        appCtx.put(UimaAsynchronousEngine.Endpoint,
>> "MeetingDetectorTaeQueue");
>>         appCtx.put(UimaAsynchronousEngine.CasPoolSize, 1);
>>    }
>>
>>     public void run() {
>>
>>        try {
>>            System.out.println("running " + mId);
>>            //initialize the client
>>            uimaEEEngine.initialize(appCtx);
>>            String text = "Id " + mId +"This is a nice test sentence.
>> And a second. Including a third.";
>>            // send an empty CAS
>>            CAS cas = uimaEEEngine.getCAS();
>>            //cas.setDocumentLanguage("en");
>>            cas.setDocumentText(text);
>>            uimaEEEngine.sendAndReceiveCAS(cas);
>>            uimaEEEngine.collectionProcessingComplete();
>>            System.out.println("Thread id " + mId + " returned " +
>> cas.getDocumentText().substring(0, 5));
>>            cas.reset();
>>            uimaEEEngine.stop();
>>        } catch (Exception e) {
>>            System.err.println("Exception during Processing!");
>>            e.printStackTrace();
>>         }
>>    }
>> }
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> On Thu, Dec 9, 2010 at 5:21 PM, Jaroslaw Cwiklik <uimaee@gmail.com> wrote:
>> > For some reason attachment dont seem to work. Here is my code:
>> >
>> > /*
>> >  * Licensed to the Apache Software Foundation (ASF) under one
>> >  * or more contributor license agreements.  See the NOTICE file
>> >  * distributed with this work for additional information
>> >  * regarding copyright ownership.  The ASF licenses this file
>> >  * to you under the Apache License, Version 2.0 (the
>> >  * "License"); you may not use this file except in compliance
>> >  * with the License.  You may obtain a copy of the License at
>> >  *
>> >  *   http://www.apache.org/licenses/LICENSE-2.0
>> >  *
>> >  * Unless required by applicable law or agreed to in writing,
>> >  * software distributed under the License is distributed on an
>> >  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
>> >  * KIND, either express or implied.  See the License for the
>> >  * specific language governing permissions and limitations
>> >  * under the License.
>> >  */
>> >
>> >
>> > import java.util.HashMap;
>> > import java.util.Map;
>> > import java.util.concurrent.ArrayBlockingQueue;
>> > import java.util.concurrent.CountDownLatch;
>> > import java.util.concurrent.ThreadPoolExecutor;
>> > import java.util.concurrent.TimeUnit;
>> >
>> > import org.apache.uima.aae.client.UimaAsynchronousEngine;
>> > import
>> org.apache.uima.adapter.jms.client.BaseUIMAAsynchronousEngine_impl;
>> > import org.apache.uima.cas.CAS;
>> >
>> > /**
>> >  * Example client application that can instantiate multiple UIMA AS
>> clients
>> > each running in
>> >  * a separate thread.
>> >  * <p>
>> >  * Arguments: brokerUrl endpoint <howManyCASesToSend> <scaleup>
>> >  * <p>
>> >  * The application creates as many UIMA AS clients and threads as
>> specified
>> > in the "scaleup"
>> >  * argument. Each instance runs in its own thread and has is its own temp
>> > reply queue. It
>> >  * uses a synchronous <i>sendAndReceive()</i> to send CASes to
a remote
>> > service. For this
>> >  * a CAS Pool containing a single CAS is sufficient.
>> >  * <p>
>> >  * Each client sends as many CASes to a remote service as specified in
>> the
>> > "howManyCASesToSend"
>> >  * argument.
>> >  * <p>
>> >  * The application initializes a CountDownLatch to the number of
>> > clients/threads which is than
>> >  * used to await completion. When a worker thread completes its run, it
>> > sends a CPC and counts down the
>> >  * latch reducing the number of busy threads. When all threads finish,
>> the
>> > application is notified
>> >  * and can proceed to cleanup and shutdown.
>> >  *
>> >  */
>> > public class MultithreadedClientApp {
>> > public CountDownLatch latch = null;
>> > public void initializeAndRun(String[] args) throws Exception {
>> > try {
>> > int howManyWorkers = Integer.parseInt(args[3]);  // how many threads
>> > latch = new CountDownLatch(howManyWorkers);  // each worker counts down
>> when
>> > done
>> > // Create Worker threads
>> > ClientWorker[] workers = new ClientWorker[howManyWorkers];
>> > final ArrayBlockingQueue<Runnable> queue = new
>> ArrayBlockingQueue<Runnable>(
>> > howManyWorkers);
>> > // Thread Pool Executor to manages threads
>> > ThreadPoolExecutor threadPool = new ThreadPoolExecutor(howManyWorkers,
>> > howManyWorkers,
>> >                Integer.MAX_VALUE, TimeUnit.SECONDS, queue);
>> > // Start all threads
>> > threadPool.prestartAllCoreThreads();
>> > for( int i=0; i < howManyWorkers; i++ ) {
>> > workers[i] = new ClientWorker();
>> > // 0 - brokerURL, 1 - queue name, 2 - how many CASes to send
>> > workers[i].initialize(args[0],args[1],Integer.parseInt(args[2]));
>> > }
>> >
>> > for( int i=0; i < howManyWorkers; i++ ) {
>> > threadPool.submit(workers[i]);  // start the workers
>> > }
>> > // Each worker counts down the latch after it is done sending CASes
>> > latch.await();
>> >  // All worker threads completed, now stop the clients
>> > for( int i=0; i < howManyWorkers; i++ ) {
>> > workers[i].stop();  // stop UIMA AS clients
>> > }
>> >
>> > threadPool.shutdown();  // cleanup thread pool
>> >  System.out.println("All UIMA AS Clients Finished Processing");
>> > } catch( Exception e ) {
>> > e.printStackTrace();
>> > }
>> >  }
>> > public static void main(String[] args) {
>> > MultithreadedClientApp client = new MultithreadedClientApp();
>> > try {
>> > if ( args.length != 4 ) {
>> > System.out.println("Usage: ");
>> > }
>> > client.initializeAndRun(args);
>> > } catch( Exception e ) {
>> > e.printStackTrace();
>> > }
>> > }
>> > public class ClientWorker implements Runnable {
>> > private BaseUIMAAsynchronousEngine_impl uimaASClient = null;
>> > private int howManyCASes = 0;
>> > public void initialize(String brokerUrl, String endpoint, int
>> howManyCASes )
>> > throws Exception {
>> >  uimaASClient = new BaseUIMAAsynchronousEngine_impl();
>> > Map<String, Object> appCtx = new HashMap<String, Object>();
>> >    // set server URI and Endpoint
>> >    // Add Broker URI
>> >    appCtx.put(UimaAsynchronousEngine.ServerUri, brokerUrl);
>> >    // Add Queue Name
>> >    appCtx.put(UimaAsynchronousEngine.Endpoint, endpoint);
>> >    // Add the Cas Pool Size and initial FS heap size
>> >    appCtx.put(UimaAsynchronousEngine.CasPoolSize, 1);
>> >
>> >    // initialize
>> >    uimaASClient.initialize(appCtx);
>> >    this.howManyCASes = howManyCASes;
>> > }
>> > public void stop() {
>> > uimaASClient.stop();
>> > }
>> > public void run() {
>> > try {
>> >    int sentSoFar = 0;
>> >             CAS cas = uimaASClient.getCAS();
>> >             int count=1;
>> >     while( sentSoFar < howManyCASes ) {
>> >
>> >          cas.setDocumentText("Some Text");
>> >
>> >          uimaASClient.sendAndReceiveCAS(cas);
>> >          System.out.println("Thread:"+Thread.currentThread().getId()+":::
>> > Success Calling sendAndReceiveCAS(). Sent "+count++ + " CASes so far");
>> >          cas.reset();
>> >          sentSoFar++;
>> >     }
>> >     uimaASClient.collectionProcessingComplete();
>> >     System.out.println("Thread::"+Thread.currentThread().getId()+" Sent
>> > CPC. Thread Done");
>> >     latch.countDown();
>> > } catch( Exception e) {
>> > e.printStackTrace();
>> > }
>> > }
>> > }
>> > }
>> >
>> >
>> >
>> >
>> > On Thu, Dec 9, 2010 at 11:04 AM, Jaroslaw Cwiklik <uimaee@gmail.com>
>> wrote:
>> >
>> >> Dietmar, I tried my example application with an Aggregate Service and
>> see
>> >> no problem. Your previous email had no source attached.
>> >> Attached please find an example application code I use in my testing. To
>> >> run it"
>> >>
>> >> java -cp <classpath> MultithreadedClientApp
>> >>
>> <brokerURL><serviceQueueName><howManyCASesEachThreadShouldSend><howManyThreads>
>> >>
>> >> The code adds a short text to each CAS before each call to
>> >> sendAndReceive(). There are no app listeners attached to UIMA AS client.
>> >>
>> >> Jerry
>> >>
>> >> 2010/12/9 Dietmar Gräbner <d.graebner@gmail.com>
>> >>
>> >> Hi Eddie,
>> >>>
>> >>> wouldn't the client requests be serialized in the szenario you propose?
>> >>>
>> >>> Dietmar
>> >>>
>> >>>
>> >>>
>> >>> On Tue, Dec 7, 2010 at 10:54 PM, Eddie Epstein <eaepstein@gmail.com>
>> >>> wrote:
>> >>> > 2010/12/7 Dietmar Gräbner <d.graebner@gmail.com>:
>> >>> >> I wrote a test client creating multiple threads. Each thread
>> >>> >> instantiates a BaseUIMAAsynchronousEngine_impl and invokes
a uima
>> >>> >> aggregate with the sendAndReceiveCAS() call. When running the
>> program
>> >>> >> with e.g. 100 Threads the client gets stuck after processing
X
>> calls.
>> >>> >
>> >>> > FWIW, a similar multithreaded client scenario that has been used
with
>> >>> > no problems is to instantiate a single
>> BaseUIMAAsynchronousEngine_impl
>> >>> > with big enough casPool and have each thread call sendAndReceiveCAS()
>> >>> > using the common API object.
>> >>> >
>> >>> > Eddie
>> >>> >
>> >>>
>> >>
>> >>
>> >
>>
>

Mime
View raw message