flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Niels Basjes <Ni...@basjes.nl>
Subject Re: Periodic flush sink?
Date Sat, 29 Apr 2017 16:05:05 GMT
Thanks.

The specific table I have here is used to debugging purposes so at the
HBase level I set a TTL of the data of 12 hours.
So I'm not worrying about the Hfiles.
Doing a lot of 'small' calls has an impact on HBase as a whole (not just
this table) so I want buffering.
Having a buffer that can hold 1000 events and at times I create 10 events
with a single page and I'm the only on on the site (at that moment) the
events will be buffered for a much too long time.

I did a quick test and this seems to work for my case.
In what situations do you guys expect this code construct to fail? Any edge
cases I missed?

Niels

private transient BufferedMutator mutator = null;
private transient Timer timer = null;

@Override
public void open(Configuration parameters) throws Exception {
  org.apache.hadoop.conf.Configuration hbaseConfig =
HBaseConfiguration.create();
  Connection connection = ConnectionFactory.createConnection(hbaseConfig);

  mutator = connection.getBufferedMutator(
    new BufferedMutatorParams(TableName.valueOf(tableName))
      .pool(getDefaultExecutor(hbaseConfig))
      .writeBufferSize(HBASE_BUFFER_SIZE)
  );

  timer = new Timer();
  timer.schedule(new TimerTask(){
    @Override
    public void run() {
      try {
        MySink.this.mutator.flush();
      } catch (Exception e) {
        // Ignore
      }
    }}, HBASE_BUFFER_AUTO_FLUSH_INTERVAL, HBASE_BUFFER_AUTO_FLUSH_INTERVAL);
}

@Override
public void close() throws IOException {
  timer.cancel();
  mutator.close();
}





On Sat, Apr 29, 2017 at 4:57 PM, Ted Yu <yuzhihong@gmail.com> wrote:

> I expect Flink expert to answer your question.
>
> bq. I get a flush of the buffers atleast every few seconds
>
> From hbase point of view, during low traffic period, the above may result
> in many small hfiles, leading to more work for the compaction.
>
> FYI
>
> On Sat, Apr 29, 2017 at 7:32 AM, Niels Basjes <Niels@basjes.nl> wrote:
>
>> Hi,
>>
>> I have a sink that writes my records into HBase.
>>
>> The data stream is attached to measurements from an internal testing
>> instance of the website.
>> As a consequence there are periods of really high load (someone is doing
>> a load test) and really low load (only a hand full of people are testing
>> stuff).
>>
>> I read the records from Kafka and I want to write the records into HBase.
>> Because under high load it is more efficient to buffer the writes between
>> the client and the server and as indicated by HBase I use a BufferedMutator.
>>
>> This BufferedMutator works with a 'fixed size' buffer and under high load
>> setting it to a few MiB improves the performance writing to HBase greatly.
>> However under low load you have to wait until the buffer is full and that
>> can be a LONG time (hours) when the load is really low.
>>
>> I want to fire a periodic event into my sink to ensure I get a flush of
>> the buffers atleast every few seconds.
>>
>> Simply implement a standard Java  TimerTask and fire that using a Timer?
>> Or is there a better way of doing that in Flink?
>>
>>
>> --
>> Best regards / Met vriendelijke groeten,
>>
>> Niels Basjes
>>
>
>


-- 
Best regards / Met vriendelijke groeten,

Niels Basjes

Mime
View raw message