flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Biplob Biswas <revolutioni...@gmail.com>
Subject Flink QueryableState with Sliding Window on RocksDB
Date Fri, 28 Jul 2017 13:34:21 GMT
Hi,

We recently moved from Spark Streaming to Flink for our stream processing
requirements in our organization and we are in the process of removing the
number of external calls as much as possible. Earlier we were using HBASE to
store the incoming data, but we now want to try out stateful operations on
top of Flink.

In that aspect, we have fixed that we need to have a sliding window of size
180 days with a slide Interval of 1 day each such that we keep a state of
180 days at any given time. This state would at max be around 40-50 GB for
the 180 days so we thought of using RocksDB for state storage. 

Now the flow of job we are thinking would be incoming events and some extra
information:

events.keyBy(eventTuple -> eventTuple.getEventUID()).flatMap(new
UpdatedTxnState());

where UpdatedTxnState() is an extension of RichFlatMapFunction class and it
looks something like this :


public class UpdatedTxnState extends RichFlatMapFunction<Tuple3&lt;String,
List&lt;String>, EventType>, Tuple2<String, EventType>> {

  private ValueState<Tuple3&lt;EventType, List&lt;String>, String>>
txnState;

  @Override
  public void open(Configuration config) throws Exception {
    // Reducing state that keeps a sum
    ValueStateDescriptor<Tuple3&lt;EventType, List&lt;String>, String>>
stateDescriptor = new ValueStateDescriptor<>(
            "transaction", TypeInformation.of(new
TypeHint<Tuple3&lt;EventType, List&lt;String>, String>>() {
    }));

    stateDescriptor.setQueryable("transaction");

    this.txnState = getRuntimeContext().getState(stateDescriptor);
  }

  @Override
  public void flatMap(Tuple3<String, List&lt;String>, EventType> input,
                      Collector<Tuple2&lt;String, EventType>> output) throws
Exception {


    txnState.update(new Tuple3<>(input._3(),input._2(),input._1());

    output.collect(new Tuple2<>(input._1(),input._3()));
  }
}


now, I have a couple of questions :
1. how can I create a sliding window on top of this state? I can think of
doing a keyby on the output of flatmap but for me doesn't really make much
sense and I didn't really find a way to build a state after windowing. 
2. Can I query the state with the state name I defined here "transaction"
anywhere in my job?

Thanks,
Biplob



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-QueryableState-with-Sliding-Window-on-RocksDB-tp14514.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Mime
View raw message