flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Fabian Hueske <fhue...@gmail.com>
Subject Re: [Flink-9407] Question about proposed ORC Sink !
Date Wed, 27 Jun 2018 07:59:53 GMT
Hi Sagar,

That's more a question for the ORC community, but AFAIK, the top-level type
is always a struct because it needs to wrap the fields, e.g.,
struct(name:string, age:int)

Best, Fabian

2018-06-26 22:38 GMT+02:00 sagar loke <sagarit2@gmail.com>:

> @zhangminglei,
>
> Question about the schema for ORC format:
>
> 1. Does it always need to be of complex type "<Struct>" ?
>
> 2. Or can it be created with individual data types directly ?
>     eg. "name:string, age:int" ?
>
>
> Thanks,
> Sagar
>
> On Fri, Jun 22, 2018 at 11:56 PM, zhangminglei <18717838093@163.com>
> wrote:
>
>> Yes, it should be exit. Thanks to Ted Yu. Very exactly!
>>
>> Cheers
>> Zhangminglei
>>
>> 在 2018年6月23日,下午12:40,Ted Yu <yuzhihong@gmail.com> 写道:
>>
>> For #1, the word exist should be exit, right ?
>> Thanks
>>
>> -------- Original message --------
>> From: zhangminglei <18717838093@163.com>
>> Date: 6/23/18 10:12 AM (GMT+08:00)
>> To: sagar loke <sagarit2@gmail.com>
>> Cc: dev <dev@flink.apache.org>, user <user@flink.apache.org>
>> Subject: Re: [Flink-9407] Question about proposed ORC Sink !
>>
>> Hi, Sagar.
>>
>> 1. It solves the issue partially meaning files which have finished
>> checkpointing don't show .pending status but the files which were in
>> progress
>>     when the program exists are still in .pending state.
>>
>>
>> Ans:
>>
>> Yea, Make the program exists and in that time if a checkpoint does not
>> finished will lead the status keeps in .pending state then. Under the
>> normal circumstances, the programs that running in the production env will
>> never be stoped or existed if everything is fine.
>>
>> 2. Ideally, writer should work with default settings correct ? Meaning we
>> don't have to explicitly set these parameters to make it work.
>>     Is this assumption correct ?
>>
>>
>> Ans:
>>
>> Yes. Writer should work with default settings correct.
>> Yes. We do not have to explicitly set these parameters to make it work.
>> Yes. Assumption correct indeed.
>>
>> However, you know, flink is a real time streaming framework, so under
>> normal circumstances,you don't really go to use the default settings when
>> it comes to a specific business. Especially together work with *offline
>> end*(Like hadoop mapreduce). In this case, you need to tell the offline
>> end when time a bucket is close and when time the data for the specify
>> bucket is ready. So, you can take a look on https://issues.apache.org/j
>> ira/browse/FLINK-9609.
>>
>> Cheers
>> Zhangminglei
>>
>>
>> 在 2018年6月23日,上午8:23,sagar loke <sagarit2@gmail.com> 写道:
>>
>> Hi Zhangminglei,
>>
>> Thanks for the reply.
>>
>> 1. It solves the issue partially meaning files which have finished
>> checkpointing don't show .pending status but the files which were in
>> progress
>>     when the program exists are still in .pending state.
>>
>> 2. Ideally, writer should work with default settings correct ? Meaning we
>> don't have to explicitly set these parameters to make it work.
>>     Is this assumption correct ?
>>
>> Thanks,
>> Sagar
>>
>> On Fri, Jun 22, 2018 at 3:25 AM, zhangminglei <18717838093@163.com>
>> wrote:
>>
>>> Hi, Sagar. Please use the below code and you will find the part files
>>> status from _part-0-107.in-progress   to _part-0-107.pending and
>>> finally to part-0-107. [For example], you need to run the program for a
>>> while. However, we need set some parameters, like the following. Moreover,
>>> *enableCheckpointing* IS also needed. I know why you always see the
>>> *.pending* file since the below parameters default value is 60 seconds
>>> even though you set the enableCheckpoint. So, that is why you can not see
>>> the finished file status until 60 seconds passed.
>>>
>>> Attached is the ending on my end, and you will see what you want!
>>>
>>> Please let me know if you still have the problem.
>>>
>>> Cheers
>>> Zhangminglei
>>>
>>> setInactiveBucketCheckInterval(2000)
>>> .setInactiveBucketThreshold(2000);
>>>
>>>
>>> public class TestOrc {
>>>    public static void main(String[] args) throws Exception {
>>>       StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
>>>       env.setParallelism(1);
>>>       env.enableCheckpointing(1000);
>>>       env.setStateBackend(new MemoryStateBackend());
>>>
>>>       String orcSchemaString = "struct<name:string,age:int,married:boolean>";
>>>       String path = "hdfs://10.199.196.0:9000/data/hive/man";
>>>
>>>       BucketingSink<Row> bucketingSink = new BucketingSink<>(path);
>>>
>>>       bucketingSink
>>>          .setWriter(new OrcFileWriter<>(orcSchemaString))
>>>          .setInactiveBucketCheckInterval(2000)
>>>          .setInactiveBucketThreshold(2000);
>>>
>>>       DataStream<Row> dataStream = env.addSource(new ManGenerator());
>>>
>>>       dataStream.addSink(bucketingSink);
>>>
>>>       env.execute();
>>>    }
>>>
>>>    public static class ManGenerator implements SourceFunction<Row> {
>>>
>>>       @Override
>>>       public void run(SourceContext<Row> ctx) throws Exception {
>>>          for (int i = 0; i < 2147483000; i++) {
>>>             Row row = new Row(3);
>>>             row.setField(0, "Sagar");
>>>             row.setField(1, 26 + i);
>>>             row.setField(2, false);
>>>             ctx.collect(row);
>>>          }
>>>       }
>>>
>>>       @Override
>>>       public void cancel() {
>>>
>>>       }
>>>    }
>>> }
>>>
>>> <filestatus.jpg>
>>>
>>>
>>>
>>> 在 2018年6月22日,上午11:14,sagar loke <sagarit2@gmail.com> 写道:
>>>
>>> Sure, we can solve it together :)
>>>
>>> Are you able to reproduce it ?
>>>
>>> Thanks,
>>> Sagar
>>>
>>> On Thu, Jun 21, 2018 at 7:28 PM zhangminglei <18717838093@163.com>
>>> wrote:
>>>
>>>> Sagar, flush will be called when do a checkpoint. Please see
>>>>
>>>> bucketState.currentFileValidLength = bucketState.writer.flush();
>>>>
>>>>
>>>>
>>>> @Override
>>>> public void snapshotState(FunctionSnapshotContext context) throws Exception
{
>>>>    Preconditions.checkNotNull(restoredBucketStates, "The operator has not
been properly initialized.");
>>>>
>>>>    restoredBucketStates.clear();
>>>>
>>>>    synchronized (state.bucketStates) {
>>>>       int subtaskIdx = getRuntimeContext().getIndexOfThisSubtask();
>>>>
>>>>       for (Map.Entry<String, BucketState<T>> bucketStateEntry
: state.bucketStates.entrySet()) {
>>>>          BucketState<T> bucketState = bucketStateEntry.getValue();
>>>>
>>>>          if (bucketState.isWriterOpen) {
>>>>             bucketState.currentFileValidLength = bucketState.writer.flush();
>>>>          }
>>>>
>>>>          synchronized (bucketState.pendingFilesPerCheckpoint) {
>>>>             bucketState.pendingFilesPerCheckpoint.put(context.getCheckpointId(),
bucketState.pendingFiles);
>>>>          }
>>>>          bucketState.pendingFiles = new ArrayList<>();
>>>>       }
>>>>       restoredBucketStates.add(state);
>>>>
>>>>       if (LOG.isDebugEnabled()) {
>>>>          LOG.debug("{} idx {} checkpointed {}.", getClass().getSimpleName(),
subtaskIdx, state);
>>>>       }
>>>>    }
>>>>
>>>>
>>>>
>>>> 在 2018年6月22日,上午10:21,sagar loke <sagarit2@gmail.com>
写道:
>>>>
>>>> Thanks for replying.
>>>>
>>>> Yes, I tried with different values of checkpoint eg. 20, 100, 5000.
>>>>
>>>> env.enablecheckpointing(100);
>>>>
>>>> But in all the cases, I still see .pending state.
>>>>
>>>> Not sure if it’s related to flush() method from OrcFileWriter ? Which
>>>> might not be getting called somehow ?
>>>>
>>>> Thanks,
>>>> Sagar
>>>>
>>>> On Thu, Jun 21, 2018 at 7:17 PM zhangminglei <18717838093@163.com>
>>>> wrote:
>>>>
>>>>> Hi,Sagar
>>>>>
>>>>> Please take a look at BucketingSink, It says that a file would keep
>>>>> .pending status if you DO NOT do a checkpoint. Doc says,  when a checkpoint
>>>>> is successful the currently pending file will be removed to {@code
>>>>> finished}.
>>>>> Take a try again. I think you should call the below method and see
>>>>> what would happen on it. Anyway, I will also try that and see whether
it
>>>>> works. Please let me know if you still meet error.
>>>>>
>>>>>  env.enableCheckpointing(200);
>>>>>
>>>>> /**
>>>>>  * The suffix for {@code pending} part files. These are closed files
that we are
>>>>>  * not currently writing to (inactive or reached {@link #batchSize}),
but which
>>>>>  * were not yet confirmed by a checkpoint.
>>>>>  */
>>>>> private static final String DEFAULT_PENDING_SUFFIX = ".pending";
>>>>>
>>>>> <p>Part files can be in one of three states: {@code in-progress},
{@code pending} or {@code finished}.
>>>>> * The reason for this is how the sink works together with the checkpointing
mechanism to provide exactly-once
>>>>> * semantics and fault-tolerance. The part file that is currently being
written to is {@code in-progress}. Once
>>>>> * a part file is closed for writing it becomes {@code pending}. When
a checkpoint is successful the currently
>>>>> * pending files will be moved to {@code finished}.
>>>>>
>>>>>
>>>>> Cheers
>>>>> Zhangminglei
>>>>>
>>>>>
>>>>>
>>>>> 在 2018年6月22日,上午4:46,sagar loke <sagarit2@gmail.com>
写道:
>>>>>
>>>>> Thanks Zhangminglei for quick response.
>>>>>
>>>>> I tried the above code and I am seeing another issue where the files
>>>>> created on hdfs are always in *.pending* state.
>>>>>
>>>>> Let me know if you can reproduce it ?
>>>>>
>>>>> Thanks,
>>>>> Sagar
>>>>>
>>>>> On Thu, Jun 21, 2018 at 3:20 AM, zhangminglei <18717838093@163.com>
>>>>> wrote:
>>>>>
>>>>>> Hi, Sagar
>>>>>>
>>>>>> I did a local test for that and it seems works fine for me. PR will
>>>>>> be updated for [FLINK-9407]
>>>>>>
>>>>>> I will update the newest code to PR soon and below is the example
I
>>>>>> was using for my test. You can check it again. Hopes you can enjoy
it!
>>>>>>
>>>>>> Cheers
>>>>>> Zhangminglei.
>>>>>>
>>>>>> import org.apache.flink.streaming.api.datastream.DataStream;
>>>>>> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
>>>>>> import org.apache.flink.streaming.api.functions.source.SourceFunction;
>>>>>> import org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink;
>>>>>> import org.apache.flink.types.Row;
>>>>>>
>>>>>> public class TestOrc {
>>>>>>    public static void main(String[] args) throws Exception {
>>>>>>       StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
>>>>>>       env.setParallelism(1);
>>>>>>
>>>>>>       String orcSchemaString = "struct<name:string,age:int,married:boolean>";
>>>>>>       String path = "hdfs://10.199.196.0:9000/data/hive/man";
>>>>>>
>>>>>>       BucketingSink<Row> bucketingSink = new BucketingSink<>(path);
>>>>>>
>>>>>>       bucketingSink.setWriter(new OrcFileWriter<>(orcSchemaString));
>>>>>>
>>>>>>       DataStream<Row> dataStream = env.addSource(new ManGenerator());
>>>>>>
>>>>>>       dataStream.addSink(bucketingSink);
>>>>>>
>>>>>>       env.execute();
>>>>>>    }
>>>>>>
>>>>>>    public static class ManGenerator implements SourceFunction<Row>
{
>>>>>>
>>>>>>       @Override
>>>>>>       public void run(SourceContext<Row> ctx) throws Exception
{
>>>>>>          for (int i = 0; i < 3; i++) {
>>>>>>             Row row = new Row(3);
>>>>>>             row.setField(0, "Sagar");
>>>>>>             row.setField(1, 26 + i);
>>>>>>             row.setField(2, false);
>>>>>>             ctx.collect(row);
>>>>>>          }
>>>>>>       }
>>>>>>
>>>>>>       @Override
>>>>>>       public void cancel() {
>>>>>>
>>>>>>       }
>>>>>>    }
>>>>>> }
>>>>>>
>>>>>>
>>>>>>
>>>>>> 在 2018年6月21日,上午1:47,sagar loke <sagarit2@gmail.com>
写道:
>>>>>>
>>>>>> Hi Zhangminglei,
>>>>>>
>>>>>> Question about  https://issues.apache.org/jira/browse/FLINK-9407
>>>>>>
>>>>>> I tried to use the code from PR and run it on local hdfs cluster
to
>>>>>> write some ORC data.
>>>>>>
>>>>>> But somehow this code is failing with following error:
>>>>>>
>>>>>>
>>>>>>
>>>>>> org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs
>>>>>>> .protocol.AlreadyBeingCreatedException): Failed to CREATE_FILE
>>>>>>> /tmp/hivedatanew2/2018-06-20/10/34/_part-0-0.in-progress for
>>>>>>> DFSClient_NONMAPREDUCE_73219864_36 on 127.0.0.1 because this
file
>>>>>>> lease is currently owned by DFSClient_NONMAPREDUCE_-1374584007_36
>>>>>>> on 127.0.0.1
>>>>>>
>>>>>> at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.recoverL
>>>>>>> easeInternal(FSNamesystem.java:2500)
>>>>>>
>>>>>>
>>>>>> I understand that this error is related to Hadoop but somehow I get
>>>>>> this error only when executing the code from this PR.
>>>>>>
>>>>>> I had created very crude way to write ORC file to HDFS as per
>>>>>> follows. Below code works alright and does not throw above error.
>>>>>>
>>>>>> import org.apache.flink.streaming.connectors.fs.Writer;
>>>>>>> import org.apache.hadoop.fs.FileSystem;
>>>>>>> import org.apache.hadoop.fs.Path;
>>>>>>> import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
>>>>>>> import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
>>>>>>> import org.apache.orc.OrcFile;
>>>>>>> import org.apache.orc.TypeDescription;
>>>>>>> import org.apache.hadoop.conf.Configuration;
>>>>>>>
>>>>>>> import java.io.IOException;
>>>>>>>
>>>>>>> public class FlinkOrcWriterV1<T> implements org.apache.flink.streaming.connectors.fs.Writer<T>
{
>>>>>>>
>>>>>>>     private transient org.apache.orc.Writer orcWriter;
>>>>>>>     String schema;
>>>>>>>     TypeDescription typeDescriptionschema;//"struct<x:int,y:int>"
>>>>>>>     String basePath;
>>>>>>>
>>>>>>>     public FlinkOrcWriterV1(String schema) {
>>>>>>>         this.schema = schema;
>>>>>>>         this.typeDescriptionschema = TypeDescription.fromString(schema);
>>>>>>>     }
>>>>>>>
>>>>>>>     @Override
>>>>>>>     public void open(FileSystem fs, Path path) throws IOException
{
>>>>>>>         Configuration conf = new Configuration();
>>>>>>>         orcWriter = OrcFile.createWriter(new Path("hdfs://localhost:9000/tmp/hivedata3/"),
>>>>>>>                     OrcFile.writerOptions(conf)
>>>>>>>                         .setSchema(typeDescriptionschema));
>>>>>>>     }
>>>>>>>
>>>>>>>     @Override
>>>>>>>     public long flush() throws IOException {
>>>>>>>         return orcWriter.writeIntermediateFooter();
>>>>>>>     }
>>>>>>>
>>>>>>>     @Override
>>>>>>>     public long getPos() throws IOException {
>>>>>>>         return orcWriter.getRawDataSize();
>>>>>>>     }
>>>>>>>
>>>>>>>     @Override
>>>>>>>     public void close() throws IOException {
>>>>>>>         orcWriter.close();
>>>>>>>     }
>>>>>>>
>>>>>>>     @Override
>>>>>>>     public void write(T element) throws IOException {
>>>>>>>         VectorizedRowBatch batch = typeDescriptionschema.createRowBatch(10);
>>>>>>>         LongColumnVector x = (LongColumnVector) batch.cols[0];
>>>>>>>         LongColumnVector y = (LongColumnVector) batch.cols[1];
>>>>>>>         for(int r=0; r < 10; ++r) {
>>>>>>>             int row = batch.size++;
>>>>>>>             x.vector[row] = r;
>>>>>>>             y.vector[row] = r * 3;
>>>>>>>             // If the batch is full, write it out and start over.
>>>>>>>             if (batch.size == batch.getMaxSize()) {
>>>>>>>                 orcWriter.addRowBatch(batch);
>>>>>>>                 batch.reset();
>>>>>>>             }
>>>>>>>         }
>>>>>>>         if (batch.size != 0) {
>>>>>>>             orcWriter.addRowBatch(batch);
>>>>>>>             batch.reset();
>>>>>>>         }
>>>>>>>     }
>>>>>>>
>>>>>>>     @Override
>>>>>>>     public FlinkOrcWriterV1<T> duplicate() {
>>>>>>>         return new FlinkOrcWriterV1<>(schema);
>>>>>>>     }
>>>>>>> }
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>> Not sure, if the error is related to any of the hadoop dependencies
>>>>>> or something else ?
>>>>>>
>>>>>> Can you please look into it and let me know if you can reproduce
it
>>>>>> on your end too ?
>>>>>>
>>>>>> By the way, following are my dependencies in my project:
>>>>>>
>>>>>> dependencies {
>>>>>>>
>>>>>>>     compile 'org.apache.flink:flink-java:1.4.2'
>>>>>>>
>>>>>>>     compile 'org.apache.flink:flink-runtime_2.11:1.4.2'
>>>>>>>
>>>>>>>     compile 'org.apache.flink:flink-streaming-java_2.11:1.4.2'
>>>>>>>
>>>>>>>     compile 'org.apache.flink:flink-connector-kafka-0.11_2.11:1.4.2'
>>>>>>>
>>>>>>>     compile 'org.apache.flink:flink-connec
>>>>>>>> tor-elasticsearch5_2.11:1.4.2'
>>>>>>>
>>>>>>>     compile 'io.confluent:kafka-avro-serializer:3.3.0'
>>>>>>>
>>>>>>>     compile 'org.apache.flink:flink-avro:1.4.2'
>>>>>>>
>>>>>>>     compile group: 'org.apache.kafka', name: 'kafka_2.11', version:
>>>>>>>> '1.1.0'
>>>>>>>
>>>>>>>     compile group: 'org.apache.flink', name: 'flink-shaded-hadoop2',
>>>>>>>> version: '1.4.2'
>>>>>>>
>>>>>>>     compile 'org.apache.flink:flink-connector-filesystem_2.11:1.4.2'
>>>>>>>
>>>>>>>     compile group: 'org.apache.flink', name: 'flink-jdbc', version:
>>>>>>>> '1.4.2'
>>>>>>>
>>>>>>>     compile group: 'org.apache.flink', name: 'flink-table_2.11',
>>>>>>>> version: '1.4.2'
>>>>>>>
>>>>>>>     compile group: 'org.apache.orc', name: 'orc-core', version:
>>>>>>>> '1.5.1'
>>>>>>>
>>>>>>>     compile group: 'org.apache.parquet', name: 'parquet-avro',
>>>>>>>> version: '1.10.0'
>>>>>>>
>>>>>>>     compile group: 'org.apache.parquet', name: 'parquet-common',
>>>>>>>> version: '1.10.0'
>>>>>>>
>>>>>>>     compile group: 'org.apache.flink', name: 'flink-orc_2.11',
>>>>>>>> version: '1.4.2'
>>>>>>>
>>>>>>>     testCompile group: 'junit', name: 'junit', version: '4.12'
>>>>>>>
>>>>>>> }
>>>>>>>
>>>>>>>
>>>>>>
>>>>>> --
>>>>>> Thanks,
>>>>>> Sagar.
>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Regards,
>>>>> SAGAR.
>>>>>
>>>>>
>>>>> --
>>>> Cheers,
>>>> Sagar
>>>>
>>>>
>>>> --
>>> Cheers,
>>> Sagar
>>>
>>>
>>>
>>
>>
>> --
>> Regards,
>> SAGAR.
>>
>>
>>
>>
>
>
> --
> Regards,
> SAGAR.
>

Mime
View raw message