helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Zhen Zhang (JIRA)" <j...@apache.org>
Subject [jira] [Updated] (HELIX-443) Race condition in Helix register/unregister MessageHandlerFactory
Date Tue, 06 May 2014 18:45:17 GMT

     [ https://issues.apache.org/jira/browse/HELIX-443?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

Zhen Zhang updated HELIX-443:
-----------------------------

    Description: 
When zk session expiry happens, we will reset all the listeners including HelixTaskExecutor
(which is a message listener on Helix participant). The reset() will call HelixTaskExecutor#unregisterMessageHandlerFactory()
which has the logic that if the executor is not terminated in 200ms, it will throw an exception,
which will in turn, skip removing MssageHandlerFactory from handler-factory map.

{noformat}
void unregisterMessageHandlerFactory(String type) {
    // shutdown executor-service. disconnect if fail
    ExecutorService executorSvc = _executorMap.remove(type);
    if (executorSvc != null) {
      List<Runnable> tasksLeft = executorSvc.shutdownNow();
      LOG.info(tasksLeft.size() + " tasks never executed for msgType: " + type + ". tasks:
"
          + tasksLeft);
      try {
        if (!executorSvc.awaitTermination(200, TimeUnit.MILLISECONDS)) {
          LOG.error("executor-service for msgType: " + type
              + " is not fully terminated in 200ms. will disconnect helix-participant");
          throw new HelixException("fail to unregister msg-handler for msgType: " + type);
        }
      } catch (InterruptedException e) {
        LOG.error("interruped when waiting for executor-service shutdown for msgType: " +
type, e);
      }
    }

    // reset state-model
    MessageHandlerFactory handlerFty = _handlerFactoryMap.remove(type);
    if (handlerFty != null) {
      handlerFty.reset();
    }
  }
{noformat}

When we re-connect to zk, we re-register message-handler factory, which first checks if the
message-handler factory exists and then adds an executor:

{noformat}
  public void registerMessageHandlerFactory(String type, MessageHandlerFactory factory,
      int threadpoolSize) {
    if (!_handlerFactoryMap.containsKey(type)) {
      if (!type.equalsIgnoreCase(factory.getMessageType())) {
        throw new HelixException("Message factory type mismatch. Type: " + type + " factory
: "
            + factory.getMessageType());

      }
      _handlerFactoryMap.put(type, factory);
      ExecutorService executorSvc = Executors.newFixedThreadPool(threadpoolSize);
      _executorMap.put(type, executorSvc);

      LOG.info("Added msg-factory for type: " + type + ", threadpool size " + threadpoolSize);
    } else {
      LOG.warn("Fail to register msg-handler-factory for type: " + type + ", pool-size: "
          + threadpoolSize + ", factory: " + factory);
    }
  }
{noformat}

So if we fail to remove message-handler factory, we will fail to register executor, which
will lead to NPE when we receive a message:
{noformat}
java.lang.NullPointerException
        at org.apache.helix.messaging.handling.HelixTaskExecutor.scheduleTask(HelixTaskExecutor.java:243)
        at org.apache.helix.messaging.handling.HelixTaskExecutor.onMessage(HelixTaskExecutor.java:531)
        at org.apache.helix.manager.zk.CallbackHandler.invoke(CallbackHandler.java:195)
        at org.apache.helix.manager.zk.CallbackHandler.handleChildChange(CallbackHandler.java:404)
        at org.I0Itec.zkclient.ZkClient$7.run(ZkClient.java:568)
        at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)
{noformat}

> Race condition in Helix register/unregister MessageHandlerFactory
> -----------------------------------------------------------------
>
>                 Key: HELIX-443
>                 URL: https://issues.apache.org/jira/browse/HELIX-443
>             Project: Apache Helix
>          Issue Type: Bug
>            Reporter: Zhen Zhang
>            Assignee: Zhen Zhang
>
> When zk session expiry happens, we will reset all the listeners including HelixTaskExecutor
(which is a message listener on Helix participant). The reset() will call HelixTaskExecutor#unregisterMessageHandlerFactory()
which has the logic that if the executor is not terminated in 200ms, it will throw an exception,
which will in turn, skip removing MssageHandlerFactory from handler-factory map.
> {noformat}
> void unregisterMessageHandlerFactory(String type) {
>     // shutdown executor-service. disconnect if fail
>     ExecutorService executorSvc = _executorMap.remove(type);
>     if (executorSvc != null) {
>       List<Runnable> tasksLeft = executorSvc.shutdownNow();
>       LOG.info(tasksLeft.size() + " tasks never executed for msgType: " + type + ". tasks:
"
>           + tasksLeft);
>       try {
>         if (!executorSvc.awaitTermination(200, TimeUnit.MILLISECONDS)) {
>           LOG.error("executor-service for msgType: " + type
>               + " is not fully terminated in 200ms. will disconnect helix-participant");
>           throw new HelixException("fail to unregister msg-handler for msgType: " + type);
>         }
>       } catch (InterruptedException e) {
>         LOG.error("interruped when waiting for executor-service shutdown for msgType:
" + type, e);
>       }
>     }
>     // reset state-model
>     MessageHandlerFactory handlerFty = _handlerFactoryMap.remove(type);
>     if (handlerFty != null) {
>       handlerFty.reset();
>     }
>   }
> {noformat}
> When we re-connect to zk, we re-register message-handler factory, which first checks
if the message-handler factory exists and then adds an executor:
> {noformat}
>   public void registerMessageHandlerFactory(String type, MessageHandlerFactory factory,
>       int threadpoolSize) {
>     if (!_handlerFactoryMap.containsKey(type)) {
>       if (!type.equalsIgnoreCase(factory.getMessageType())) {
>         throw new HelixException("Message factory type mismatch. Type: " + type + " factory
: "
>             + factory.getMessageType());
>       }
>       _handlerFactoryMap.put(type, factory);
>       ExecutorService executorSvc = Executors.newFixedThreadPool(threadpoolSize);
>       _executorMap.put(type, executorSvc);
>       LOG.info("Added msg-factory for type: " + type + ", threadpool size " + threadpoolSize);
>     } else {
>       LOG.warn("Fail to register msg-handler-factory for type: " + type + ", pool-size:
"
>           + threadpoolSize + ", factory: " + factory);
>     }
>   }
> {noformat}
> So if we fail to remove message-handler factory, we will fail to register executor, which
will lead to NPE when we receive a message:
> {noformat}
> java.lang.NullPointerException
>         at org.apache.helix.messaging.handling.HelixTaskExecutor.scheduleTask(HelixTaskExecutor.java:243)
>         at org.apache.helix.messaging.handling.HelixTaskExecutor.onMessage(HelixTaskExecutor.java:531)
>         at org.apache.helix.manager.zk.CallbackHandler.invoke(CallbackHandler.java:195)
>         at org.apache.helix.manager.zk.CallbackHandler.handleChildChange(CallbackHandler.java:404)
>         at org.I0Itec.zkclient.ZkClient$7.run(ZkClient.java:568)
>         at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.2#6252)

Mime
View raw message