uima-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jaroslaw Cwiklik <uim...@gmail.com>
Subject Re: UimaAS blocks when accessing a queue with multiple clients concurrently
Date Thu, 09 Dec 2010 16:21:02 GMT
For some reason attachment dont seem to work. Here is my code:

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */


import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import org.apache.uima.aae.client.UimaAsynchronousEngine;
import org.apache.uima.adapter.jms.client.BaseUIMAAsynchronousEngine_impl;
import org.apache.uima.cas.CAS;

/**
 * Example client application that can instantiate multiple UIMA AS clients
each running in
 * a separate thread.
 * <p>
 * Arguments: brokerUrl endpoint <howManyCASesToSend> <scaleup>
 * <p>
 * The application creates as many UIMA AS clients and threads as specified
in the "scaleup"
 * argument. Each instance runs in its own thread and has is its own temp
reply queue. It
 * uses a synchronous <i>sendAndReceive()</i> to send CASes to a remote
service. For this
 * a CAS Pool containing a single CAS is sufficient.
 * <p>
 * Each client sends as many CASes to a remote service as specified in the
"howManyCASesToSend"
 * argument.
 * <p>
 * The application initializes a CountDownLatch to the number of
clients/threads which is than
 * used to await completion. When a worker thread completes its run, it
sends a CPC and counts down the
 * latch reducing the number of busy threads. When all threads finish, the
application is notified
 * and can proceed to cleanup and shutdown.
 *
 */
public class MultithreadedClientApp {
public CountDownLatch latch = null;
public void initializeAndRun(String[] args) throws Exception {
try {
int howManyWorkers = Integer.parseInt(args[3]);  // how many threads
latch = new CountDownLatch(howManyWorkers);  // each worker counts down when
done
// Create Worker threads
ClientWorker[] workers = new ClientWorker[howManyWorkers];
final ArrayBlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(
howManyWorkers);
// Thread Pool Executor to manages threads
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(howManyWorkers,
howManyWorkers,
                Integer.MAX_VALUE, TimeUnit.SECONDS, queue);
// Start all threads
threadPool.prestartAllCoreThreads();
for( int i=0; i < howManyWorkers; i++ ) {
workers[i] = new ClientWorker();
// 0 - brokerURL, 1 - queue name, 2 - how many CASes to send
workers[i].initialize(args[0],args[1],Integer.parseInt(args[2]));
}

for( int i=0; i < howManyWorkers; i++ ) {
threadPool.submit(workers[i]);  // start the workers
}
// Each worker counts down the latch after it is done sending CASes
latch.await();
 // All worker threads completed, now stop the clients
for( int i=0; i < howManyWorkers; i++ ) {
workers[i].stop();  // stop UIMA AS clients
}

threadPool.shutdown();  // cleanup thread pool
 System.out.println("All UIMA AS Clients Finished Processing");
} catch( Exception e ) {
e.printStackTrace();
}
 }
public static void main(String[] args) {
MultithreadedClientApp client = new MultithreadedClientApp();
try {
if ( args.length != 4 ) {
System.out.println("Usage: ");
}
client.initializeAndRun(args);
} catch( Exception e ) {
e.printStackTrace();
}
}
public class ClientWorker implements Runnable {
private BaseUIMAAsynchronousEngine_impl uimaASClient = null;
private int howManyCASes = 0;
public void initialize(String brokerUrl, String endpoint, int howManyCASes )
throws Exception {
 uimaASClient = new BaseUIMAAsynchronousEngine_impl();
Map<String, Object> appCtx = new HashMap<String, Object>();
    // set server URI and Endpoint
    // Add Broker URI
    appCtx.put(UimaAsynchronousEngine.ServerUri, brokerUrl);
    // Add Queue Name
    appCtx.put(UimaAsynchronousEngine.Endpoint, endpoint);
    // Add the Cas Pool Size and initial FS heap size
    appCtx.put(UimaAsynchronousEngine.CasPoolSize, 1);

    // initialize
    uimaASClient.initialize(appCtx);
    this.howManyCASes = howManyCASes;
}
public void stop() {
uimaASClient.stop();
}
public void run() {
try {
    int sentSoFar = 0;
             CAS cas = uimaASClient.getCAS();
             int count=1;
     while( sentSoFar < howManyCASes ) {

          cas.setDocumentText("Some Text");

          uimaASClient.sendAndReceiveCAS(cas);
          System.out.println("Thread:"+Thread.currentThread().getId()+":::
Success Calling sendAndReceiveCAS(). Sent "+count++ + " CASes so far");
          cas.reset();
          sentSoFar++;
     }
     uimaASClient.collectionProcessingComplete();
     System.out.println("Thread::"+Thread.currentThread().getId()+" Sent
CPC. Thread Done");
     latch.countDown();
} catch( Exception e) {
e.printStackTrace();
}
}
}
}




On Thu, Dec 9, 2010 at 11:04 AM, Jaroslaw Cwiklik <uimaee@gmail.com> wrote:

> Dietmar, I tried my example application with an Aggregate Service and see
> no problem. Your previous email had no source attached.
> Attached please find an example application code I use in my testing. To
> run it"
>
> java -cp <classpath> MultithreadedClientApp
> <brokerURL><serviceQueueName><howManyCASesEachThreadShouldSend><howManyThreads>
>
> The code adds a short text to each CAS before each call to
> sendAndReceive(). There are no app listeners attached to UIMA AS client.
>
> Jerry
>
> 2010/12/9 Dietmar Gräbner <d.graebner@gmail.com>
>
> Hi Eddie,
>>
>> wouldn't the client requests be serialized in the szenario you propose?
>>
>> Dietmar
>>
>>
>>
>> On Tue, Dec 7, 2010 at 10:54 PM, Eddie Epstein <eaepstein@gmail.com>
>> wrote:
>> > 2010/12/7 Dietmar Gräbner <d.graebner@gmail.com>:
>> >> I wrote a test client creating multiple threads. Each thread
>> >> instantiates a BaseUIMAAsynchronousEngine_impl and invokes a uima
>> >> aggregate with the sendAndReceiveCAS() call. When running the program
>> >> with e.g. 100 Threads the client gets stuck after processing X calls.
>> >
>> > FWIW, a similar multithreaded client scenario that has been used with
>> > no problems is to instantiate a single BaseUIMAAsynchronousEngine_impl
>> > with big enough casPool and have each thread call sendAndReceiveCAS()
>> > using the common API object.
>> >
>> > Eddie
>> >
>>
>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message