apex-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (APEXMALHAR-2130) implement scalable windowed storage
Date Wed, 27 Jul 2016 07:03:20 GMT

    [ https://issues.apache.org/jira/browse/APEXMALHAR-2130?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15395155#comment-15395155
] 

ASF GitHub Bot commented on APEXMALHAR-2130:
--------------------------------------------

Github user ilooner commented on a diff in the pull request:

    https://github.com/apache/apex-malhar/pull/345#discussion_r72389548
  
    --- Diff: library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableWindowedPlainStorage.java
---
    @@ -0,0 +1,188 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.apex.malhar.lib.window.impl;
    +
    +import java.util.Iterator;
    +import java.util.Map;
    +
    +import org.apache.apex.malhar.lib.state.spillable.Spillable;
    +import org.apache.apex.malhar.lib.state.spillable.SpillableComplexComponentImpl;
    +import org.apache.apex.malhar.lib.state.spillable.SpillableStateStore;
    +import org.apache.apex.malhar.lib.state.spillable.managed.ManagedStateSpillableStateStore;
    +import org.apache.apex.malhar.lib.utils.serde.Serde;
    +import org.apache.apex.malhar.lib.utils.serde.SerdeKryoSlice;
    +import org.apache.apex.malhar.lib.window.Window;
    +import org.apache.apex.malhar.lib.window.WindowedStorage;
    +
    +import com.datatorrent.api.Context;
    +import com.datatorrent.netlet.util.Slice;
    +
    +/**
    + * This is an implementation of WindowedPlainStorage that makes use of {@link Spillable}
data structures
    + *
    + * @param <T> Type of the value per window
    + */
    +public class SpillableWindowedPlainStorage<T> implements WindowedStorage.WindowedPlainStorage<T>
    +{
    +  private SpillableStateStore store;
    +  private transient SpillableComplexComponentImpl sccImpl;
    +  private long bucket;
    +  private Serde<Window, Slice> windowSerde;
    +  private Serde<T, Slice> valueSerde;
    +
    +  protected transient Spillable.SpillableByteMap<Window, T> internMap;
    +
    +  public SpillableWindowedPlainStorage()
    +  {
    +  }
    +
    +  public SpillableWindowedPlainStorage(long bucket, Serde<Window, Slice> windowSerde,
Serde<T, Slice> valueSerde)
    +  {
    +    this.bucket = bucket;
    +    this.windowSerde = windowSerde;
    +    this.valueSerde = valueSerde;
    +  }
    +
    +  public void setStore(SpillableStateStore store)
    +  {
    +    this.store = store;
    +  }
    +
    +  public void setBucket(long bucket)
    +  {
    +    this.bucket = bucket;
    +  }
    +
    +  public void setWindowSerde(Serde<Window, Slice> windowSerde)
    +  {
    +    this.windowSerde = windowSerde;
    +  }
    +
    +  public void setValueSerde(Serde<T, Slice> valueSerde)
    +  {
    +    this.valueSerde = valueSerde;
    +  }
    +
    +  @Override
    +  public void put(Window window, T value)
    +  {
    +    internMap.put(window, value);
    +  }
    +
    +  @Override
    +  public T get(Window window)
    +  {
    +    return internMap.get(window);
    +  }
    +
    +  @Override
    +  public Iterable<Map.Entry<Window, T>> entrySet()
    +  {
    +    return internMap.entrySet();
    +  }
    +
    +  @Override
    +  public Iterator<Map.Entry<Window, T>> iterator()
    +  {
    +    return internMap.entrySet().iterator();
    +  }
    +
    +  @Override
    +  public boolean containsWindow(Window window)
    +  {
    +    return internMap.containsKey(window);
    +  }
    +
    +  @Override
    +  public long size()
    +  {
    +    return internMap.size();
    +  }
    +
    +  @Override
    +  public void remove(Window window)
    +  {
    +    internMap.remove(window);
    +  }
    +
    +  @Override
    +  public void migrateWindow(Window fromWindow, Window toWindow)
    +  {
    +    internMap.put(toWindow, internMap.remove(fromWindow));
    +  }
    +
    +  @Override
    +  public void setup(Context.OperatorContext context)
    +  {
    +    if (store == null) {
    +      // provide a default store
    +      store = new ManagedStateSpillableStateStore();
    +    }
    +    if (bucket == 0) {
    +      // choose a bucket that is almost guaranteed to be unique
    +      bucket = (context.getValue(Context.DAGContext.APPLICATION_NAME) + "#" + context.getId()).hashCode();
    +    }
    +    // set default serdes
    +    if (windowSerde == null) {
    +      windowSerde = new SerdeKryoSlice<>();
    +    }
    +    if (valueSerde == null) {
    +      valueSerde = new SerdeKryoSlice<>();
    +    }
    +    sccImpl = new SpillableComplexComponentImpl(store);
    +    sccImpl.setup(context);
    +    internMap = sccImpl.newSpillableByteMap(bucket, windowSerde, valueSerde);
    --- End diff --
    
    @davidyan74 I think you are getting a size of zero because you are allocating a new spillable
data structure here. You should check if sccImpl is null and only if it's null initialize
it and all the SpillableDatastructures


> implement scalable windowed storage
> -----------------------------------
>
>                 Key: APEXMALHAR-2130
>                 URL: https://issues.apache.org/jira/browse/APEXMALHAR-2130
>             Project: Apache Apex Malhar
>          Issue Type: Task
>            Reporter: bright chen
>            Assignee: David Yan
>
> This feature is used for supporting windowing.
> The storage needs to have the following features:
> 1. Spillable key value storage (integrate with APEXMALHAR-2026)
> 2. Upon checkpoint, it saves a snapshot for the entire data set with the checkpointing
window id.  This should be done incrementally (ManagedState) to avoid wasting space with unchanged
data
> 3. When recovering, it takes the recovery window id and restores to that snapshot
> 4. When a window is committed, all windows with a lower ID should be purged from the
store.
> 5. It should implement the WindowedStorage and WindowedKeyedStorage interfaces, and because
of 2 and 3, we may want to add methods to the WindowedStorage interface so that the implementation
of WindowedOperator can notify the storage of checkpointing, recovering and committing of
a window.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message