apex-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gauravgopi123 <...@git.apache.org>
Subject [GitHub] incubator-apex-malhar pull request: MLHR-1938 - Operator checkpoin...
Date Wed, 16 Dec 2015 15:13:37 GMT
Github user gauravgopi123 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/125#discussion_r47787032
  
    --- Diff: library/src/main/java/com/datatorrent/lib/util/AbstractKeyValueStorageAgent.java
---
    @@ -0,0 +1,234 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.lib.util;
    +
    +import java.io.IOException;
    +import java.io.Serializable;
    +import java.util.List;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import com.datatorrent.netlet.util.DTThrowable;
    +
    +/**
    + * Abstract implementation of {@link AppIdAwareStorageAgent} which can be
    + * configured be KeyValue store witch implementation of {@link KeyValueStore}
    + * 
    + * NOTE - this should be picked from APEX-CORE once below feature is release
    + * https://issues.apache.org/jira/browse/APEXCORE-283
    + * 
    + * @param <S>
    + *          Store implementation
    + */
    +public abstract class AbstractKeyValueStorageAgent<S extends StorageAgentKeyValueStore>
    +    implements AppIdAwareStorageAgent, Serializable
    +{
    +
    +  protected S store;
    +  protected String applicationId;
    +  public static final String CHECKPOINT_KEY_SEPARATOR = "-";
    +
    +  /**
    +   * Gets the store
    +   *
    +   * @return the store
    +   */
    +  public S getStore()
    +  {
    +    return store;
    +  }
    +
    +  /**
    +   * Sets the store
    +   *
    +   * @param store
    +   */
    +  public void setStore(S store)
    +  {
    +    this.store = store;
    +  }
    +
    +  /**
    +   * Return yarn application id of running application
    +   * 
    +   * @return
    +   */
    +  public String getApplicationId()
    +  {
    +    return applicationId;
    +  }
    +
    +  /**
    +   * Set yarn application id
    +   * 
    +   * @param applicationId
    +   */
    +  public void setApplicationId(String applicationId)
    +  {
    +    this.applicationId = applicationId;
    +  }
    +
    +  /**
    +   * Generates key from operator id and window id to store unique operator
    +   * checkpoints
    +   * 
    +   * @param operatorId
    +   * @param windowId
    +   * @return unique key for store
    +   */
    +  public static String generateKey(int operatorId, long windowId)
    +  {
    +    return String.valueOf(operatorId) + CHECKPOINT_KEY_SEPARATOR + String.valueOf(windowId);
    +  }
    +
    +  /**
    +   * Stores the given operator object in configured store
    +   * 
    +   * @param object
    +   *          Operator object to store
    +   * @param operatorId
    +   *          of operator
    +   * @param windowId
    +   *          window id of operator to checkpoint
    +   * 
    +   */
    +  @Override
    +  public void save(Object object, int operatorId, long windowId) throws IOException
    +  {
    +
    +    try {
    +      store(generateKey(operatorId, windowId), object);
    +      logger.info("saved check point object key {} region {}", generateKey(operatorId,
windowId), applicationId);
    +    } catch (Throwable t) {
    +      logger.info("while saving {} {}", operatorId, windowId, t);
    +      DTThrowable.rethrow(t);
    +    }
    +  }
    +
    +  private synchronized void store(String checkpointKey, Object operator) throws IOException
    +  {
    +    if (!getStore().isConnected()) {
    +      getStore().connect();
    +    }
    +    getStore().put(checkpointKey, operator);
    +  }
    +
    +  /**
    +   * Retrieves the operator object for given operator & window from configured
    +   * store
    +   * 
    +   * @param operatorId
    +   *          of operator
    +   * @param windowId
    +   *          window id of operator to checkpoint
    +   */
    +  @Override
    +  public Object load(int operatorId, long windowId)
    +  {
    +    Object obj = null;
    +    try {
    +      obj = retrieve(generateKey(operatorId, windowId));
    +      logger.info("retrieved object from store  key {} region {} ", generateKey(operatorId,
windowId), applicationId);
    +    } catch (Throwable t) {
    +      logger.info("while loading {} {}", operatorId, windowId, t);
    +      DTThrowable.rethrow(t);
    +    }
    +
    +    return obj;
    +  }
    +
    +  private synchronized Object retrieve(String checkpointKey) throws IOException
    +  {
    +    if (!getStore().isConnected()) {
    +      getStore().connect();
    +    }
    +
    +    return getStore().get(checkpointKey);
    +  }
    +
    +  /**
    +   * Removes stored operator object for given operatorId & windowId from store
    +   * 
    +   */
    +  @Override
    +  public void delete(int operatorId, long windowId) throws IOException
    +  {
    +
    +    if (!getStore().isConnected()) {
    +      getStore().connect();
    +    }
    +
    +    try {
    +      store.remove(generateKey(operatorId, windowId));
    +      logger.info("deleted object from store key {} region {}", generateKey(operatorId,
windowId));
    +    } catch (Throwable t) {
    --- End diff --
    
    Why catch Throwable?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

Mime
View raw message