From Vijay <vijikar...@yahoo.com.INVALID>
Subject Re: RollingSink
Date Wed, 23 Mar 2016 14:28:07 GMT
Yes, I have updated on all cluster nodes and restarted entire cluster. 

Do you see any problems with the steps that I followed?


> On Mar 23, 2016, at 7:18 AM, Aljoscha Krettek <aljoscha@apache.org> wrote:
> Hi,
> did you update the log4j.properties file on all nodes where the TaskManagers run and
did you restart the whole cluster?
> Cheers,
> Aljoscha
>> On 23 Mar 2016, at 15:02, Vijay <vijikarthi@yahoo.com.INVALID> wrote:
>> Hi Aljoscha,
>> I am using standalone flink cluster (3 node). I am running flink job by submitting/uploading
jar through Flink UI.
>> I have built flink from maven and modified the RollingSink code to add new debug
>> I have also packaged the streaming file system connector package (including RollingSink
changes) to the job jar file. Modified changes include both Sytem.out as well as logger statements.
>> Updated log4j property file to DEBUG
>> Regards,
>> Vijay
>>> On Mar 23, 2016, at 6:48 AM, Aljoscha Krettek <aljoscha@apache.org> wrote:
>>> Hi,
>>> what where the steps you took? By the way, are you running this on yarn or in
standalone mode? How are you starting the Flink job? Do you still don’t see DEBUG entries
in the log?
>>> Cheers,
>>> Aljoscha
>>>> On 23 Mar 2016, at 14:32, Vijay <vijikarthi@yahoo.com> wrote:
>>>> I have changed the properties file but it did not help.
>>>> Regards,
>>>> Vijay
>>>>> On Mar 23, 2016, at 5:39 AM, Aljoscha Krettek <aljoscha@apache.org>
>>>>> Ok, then you should be able to change the log level to DEBUG in conf/log4j.properties.
>>>>>> On 23 Mar 2016, at 12:41, Vijay <vijikarthi@yahoo.com> wrote:
>>>>>> I think only the ERROR category gets displayed in the log file
>>>>>> Regards,
>>>>>> Vijay
>>>>>>> On Mar 23, 2016, at 2:30 AM, Aljoscha Krettek <aljoscha@apache.org>
>>>>>>> Hi,
>>>>>>> are you seeing the regular log output from the RollingSink in
the TaskManager logs?
>>>>>>> Cheers,
>>>>>>> Aljoscha
>>>>>>>> On 22 Mar 2016, at 20:03, Vijay Srinivasaraghavan <vijikarthi@yahoo.com>
>>>>>>>> I have tried both log4j logger as well as System.out.println
option but none of these worked. 
>>>>>>>> From what I have seen so far is the Filesystem streaming
connector classes are not packaged in the grand jar (flink-dist_2.10-1.1-SNAPSHOT.jar) that
is copied under <FLINK_HOME>/build-target/lib location as part of Flink maven build
>>>>>>>> So, I manually copied (overwrite) the compiled class files
from org.apache.flink.streaming.connectors.fs package to the my "Flink job" distribution jar
(otherwise it was using standard jars that are defined as mvn dependency in Articatory) and
then uploaded the jar to Job Manager.
>>>>>>>> Am I missing something? How do I enable logging for the RollingSink
>>>>>>>> <dependency>
>>>>>>>> <groupId>org.apache.flink</groupId>
>>>>>>>> <artifactId>flink-connector-filesystem_2.11</artifactId>
>>>>>>>> <version>${flink.version}</version>
>>>>>>>> <scope>provided</scope>
>>>>>>>> </dependency>
>>>>>>>> On Tuesday, March 22, 2016 3:04 AM, Aljoscha Krettek <aljoscha@apache.org>
>>>>>>>> Hi,
>>>>>>>> how are you printing the debug statements?
>>>>>>>> But yeah all the logic of renaming in progress files and
cleaning up after a failed job happens in restoreState(BucketState state). The steps are roughly
>>>>>>>> 1. Move current in-progress file to final location
>>>>>>>> 2. truncate the file if necessary (if truncate is not available
write a .valid-length file)
>>>>>>>> 3. Move pending files to final location that where part of
the checkpoint
>>>>>>>> 4. cleanup any leftover pending/in-progress files
>>>>>>>> Cheers,
>>>>>>>> Aljoscha
>>>>>>>>> On 22 Mar 2016, at 10:08, Vijay Srinivasaraghavan <vijikarthi@yahoo.com.INVALID>
>>>>>>>>> Hello,
>>>>>>>>> I have enabled checkpoint and I am using RollingSink
to sink the data to HDFS (2.7.x) from KafkaConsumer. To simulate failover/recovery, I stopped
TaskManager and the job gets rescheduled to other Taskmanager instance. During this momemnt,
the current "in-progress" gets closed and renamed to part-0-1 from _part-0-1_in-progress.

>>>>>>>>> I was hoping to see the debug statement that I have added
to "restoreState" method but none of my debug statement gets printed. I am not sure if the
restoreState() method gets invoked during this scenario. Could you please help me understand
the flow during "failover" scenario?
>>>>>>>>> P.S: Functionally the code appears to be working fine
but I am trying to understand the underlying implementation details. public void restoreState(BucketState
>>>>>>>>> Regards
>>>>>>>>> Vijay

