gobblin-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Vicky Kak <vicky....@gmail.com>
Subject Re: Corrupted state file when Jobs are being run in parallel.
Date Sun, 12 Nov 2017 04:10:26 GMT
Hi Hung,

Please find my replies marked in blue

What's the underlying file system you are using?

I am using the normal file system, not the hdfs.
>>file:/home/Installable/gobblin-dist/working-dir/state-store/
FlickrPageExtractorPull_137/current.jst

>>Have you checked to make sure there are not multiple instances of the job
running concurrently or make use of the job lock?

We have the use case where the multiple instances of the Jobs will be run
in parallel ;)
We have the custom implementation of service which uses linked restli to
listen to the incoming rest calls, these rest calls will launch the jobs.
There will be multiple instances of Job running concurrently most of the
time. We have disabled the job lock. Please check this thread
https://groups.google.com/forum/#!topic/gobblin-users/_bTxGa3wZaI
We started developing the solution last year when GAAS was not release so
we landed up building our own custom modules which works fine but started
troubling us during the load test which we figured last week. Please check
this one
https://groups.google.com/forum/#!searchin/gobblin-users/vicky$20kak%7Csort:date/gobblin-users/kHrWh6lfGJM/iPBQSEcdBQAJ

>>Also, the state store and state are separate things. You can still pass
state from the source to the extractor without the state store enabled. The
state store is used to transfer state across >>executions, like cases where
a watermark is used to resume an incremental pull.

This is a relief, let me try it out and see what happens. I think this is
what is required for our use case.

Thanks,
Vicky


On Sun, Nov 12, 2017 at 3:32 AM, Hung Tran <hutran@linkedin.com> wrote:

> Hi Vicky,
>
>
> What's the underlying file system you are using?
>
>
> Have you checked to make sure there are not multiple instances of the job
> running concurrently or make use of the job lock?
>
>
> Also, the state store and state are separate things. You can still pass
> state from the source to the extractor without the state store enabled. The
> state store is used to transfer state across executions, like cases where a
> watermark is used to resume an incremental pull.
>
>
> Hung.
> ------------------------------
> *From:* Vicky Kak <vicky.kak@gmail.com>
> *Sent:* Saturday, November 11, 2017 5:58:59 AM
> *To:* user@gobblin.incubator.apache.org; dev@gobblin.incubator.apache.org
> *Subject:* Corrupted state file when Jobs are being run in parallel.
>
> Hi Guys,
>
> I have been running the stress tests and am seeing the following errors
>
> Error 1
> ************************************************************
> *********************************************************************
> 017-11-11 11:20:56 UTC INFO  [pool-11-thread-421]
> org.apache.hadoop.fs.FSInputChecker  284 - Found checksum error: b[0,
> 512]=53455106196f72672e6170616368652e6861646f6f702e696f2e54657874
> 25676f62626c696e2e72756e74696d652e4a6f6253746174652444617461
> 736574537461746501012a6f72672e6170616368652e6861646f6f702e69
> 6f2e636f6d70726573732e44656661756c74436f6465630000000044e218
> b9e6aad3f1aa97f2210fb5c7f0ffffffff44e218b9e6aad3f1aa97f2210f
> b5c7f00109789c6304000002000209789c630000000100010b789cebb3d5
> 0200025100f68e0ab4789ced5b7b73dbc611971c3b8d5ff233b6d324ad86
> 1337e9d804013e4451692643d1924c51a26489962da71ece013890270238
> f870904479fc1592ffdb4fd1e9f4b364a6dfa3ff770f0fbe244384eca6c9
> 98d2f081bbddc5deede2f6777bcbcf974da275d8266ae1a543ce90c629db
> f44cb3a9e48ae93daa3663fa9b4a419195ac5c2a147373a5a9a9e9e6df4a
> df3c0a3ee7ff39e5ff5da7172b1bebebd54663097aa6a6c52b995c9923b7
> 333e79530e15f93954e41f8122d7fe0d6f8f6fe0805bb291855d0769f8ae
> e14b961c102da17d4625576b630b5d7ae561d6954c64b7ce75d817420986
> 39b4f036c348772835250b1dbae4084f672fba1c1a2d89e85f159031870d
> 944fe7545d4be70b46313d5f9071ba24e772459445322aea331479bc2df9
> 6f1e33bf6d73eeb80b998c4d74506c1f3349a356c627ca4a72467c520637fa9e
> org.apache.hadoop.fs.ChecksumException: Checksum error:
> file:/home/Installable/gobblin-dist/working-dir/state-store/
> FlickrPageExtractorPull_137/current.jst
> at 0 exp: 36820587 got: 91149211
>         at
> org.apache.hadoop.fs.FSInputChecker.verifySums(FSInputChecker.java:322)
>         at
> org.apache.hadoop.fs.FSInputChecker.readChecksumChunk(
> FSInputChecker.java:278)
>         at org.apache.hadoop.fs.FSInputChecker.fill(
> FSInputChecker.java:213)
>         at
> org.apache.hadoop.fs.FSInputChecker.read1(FSInputChecker.java:231)
>         at org.apache.hadoop.fs.FSInputChecker.read(
> FSInputChecker.java:195)
>         at java.io.DataInputStream.readFully(DataInputStream.java:195)
>         at java.io.DataInputStream.readFully(DataInputStream.java:169)
>         at
> org.apache.hadoop.io.SequenceFile$Reader.init(SequenceFile.java:1845)
>         at
> org.apache.hadoop.io.SequenceFile$Reader.initialize(SequenceFile.java:
> 1810)
>         at
> org.apache.hadoop.io.SequenceFile$Reader.<init>(SequenceFile.java:1759)
>         at
> org.apache.hadoop.io.SequenceFile$Reader.<init>(SequenceFile.java:1773)
>         at
> gobblin.runtime.FsDatasetStateStore.getAll(FsDatasetStateStore.java:119)
>         at
> gobblin.runtime.FsDatasetStateStore.getLatestDatasetStatesByUrns(
> FsDatasetStateStore.java:173)
>         at gobblin.runtime.JobContext.<init>(JobContext.java:136)
>         at
> gobblin.runtime.AbstractJobLauncher.<init>(AbstractJobLauncher.java:131)
>         at
> gobblin.runtime.local.LocalJobLauncher.<init>(LocalJobLauncher.java:62)
>         at
> gobblin.runtime.JobLauncherFactory.newJobLauncher(
> JobLauncherFactory.java:80)
>         at
> gobblin.runtime.JobLauncherFactory.newJobLauncher(
> JobLauncherFactory.java:59)
>         at com.bph.JobLauncherResource.search(JobLauncherResource.
> java:107)
>         at sun.reflect.GeneratedMethodAccessor8.invoke(Unknown Source)
>         at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(
> DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:498)
>         at
> com.linkedin.restli.internal.server.RestLiMethodInvoker.
> doInvoke(RestLiMethodInvoker.java:186)
>         at
> com.linkedin.restli.internal.server.RestLiMethodInvoker.
> invoke(RestLiMethodInvoker.java:141)
>         at
> com.linkedin.restli.server.RestLiServer.handleResourceRequest(
> RestLiServer.java:286)
>         at
> com.linkedin.restli.server.RestLiServer.doHandleRequest(
> RestLiServer.java:167)
>         at
> com.linkedin.restli.server.BaseRestServer.handleRequest(
> BaseRestServer.java:56)
>         at
> com.linkedin.restli.server.DelegatingTransportDispatcher.
> handleRestRequest(DelegatingTransportDispatcher.java:56)
>         at
> com.linkedin.r2.filter.transport.DispatcherRequestFilter.onRestRequest(
> DispatcherRequestFilter.java:81)
>         at
> com.linkedin.r2.filter.FilterChainImpl$RestRequestFilterAdapter.
> onRequest(FilterChainImpl.java:328)
>         at
> com.linkedin.r2.filter.ComposedFilter.onRequest(ComposedFilter.java:55)
>         at
> com.linkedin.r2.filter.FilterChainIterator.onRequest(
> FilterChainIterator.java:50)
>         at
> com.linkedin.r2.filter.compression.ServerCompressionFilter.onRestRequest(
> ServerCompressionFilter.java:126)
>         at
> com.linkedin.r2.filter.FilterChainImpl$RestRequestFilterAdapter.
> onRequest(FilterChainImpl.java:328)
>         at
> com.linkedin.r2.filter.ComposedFilter.onRequest(ComposedFilter.java:55)
>         at
> com.linkedin.r2.filter.FilterChainIterator.onRequest(
> FilterChainIterator.java:50)
>         at
> com.linkedin.r2.filter.ComposedFilter.onRequest(ComposedFilter.java:59)
>         at
> com.linkedin.r2.filter.FilterChainIterator.onRequest(
> FilterChainIterator.java:50)
>         at
> com.linkedin.r2.filter.FilterChainImpl.onRestRequest(
> FilterChainImpl.java:103)
>         at
> com.linkedin.r2.filter.transport.FilterChainDispatcher.handleRestRequest(
> FilterChainDispatcher.java:74)
>         at
> com.linkedin.r2.transport.http.server.HttpDispatcher.
> handleRequest(HttpDispatcher.java:95)
>         at
> com.linkedin.r2.transport.http.server.HttpDispatcher.
> handleRequest(HttpDispatcher.java:62)
>         at
> com.linkedin.r2.transport.http.server.HttpNettyServer$
> Handler.messageReceived(HttpNettyServer.java:171)
>         at
> org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(
> SimpleChannelUpstreamHandler.java:80)
>         at
> org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(
> DefaultChannelPipeline.java:545)
>         at
> org.jboss.netty.channel.DefaultChannelPipeline$
> DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:754)
>         at
> org.jboss.netty.handler.execution.ChannelEventRunnable.run(
> ChannelEventRunnable.java:69)
>         at
> org.jboss.netty.handler.execution.OrderedMemoryAwareThreadPoolEx
> ecutor$ChildExecutor.run(OrderedMemoryAwareThreadPoolExecutor.java:316)
>         at
> java.util.concurrent.ThreadPoolExecutor.runWorker(
> ThreadPoolExecutor.java:1142)
>         at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(
> ThreadPoolExecutor.java:617)
>         at java.lang.Thread.run(Thread.java:745)
> 2017-11-11 11:20:56 UTC ERROR [pool-11-thread-421]
> com.bph.JobLauncherResource  110 -  Job Id fk_137 failed while searching
> key beryls Failed to create job launcher:
> org.apache.hadoop.fs.ChecksumException: Checksum error:
> file:/home/Installable/gobblin-dist/working-dir/state-store/
> FlickrPageExtractorPull_137/current.jst
> at 0 exp: 36820587 got: 91149211
> 2017-11-11 11:20:56 UTC INFO  [pool-11-thread-402]
> gobblin.util.ExecutorsUtils  125 - Attempting to shutdown ExecutorService:
> java.util.concurrent.ThreadPoolExecutor@6bce96a5[Shutting down, pool size
> =
> 1, active threads = 0, queued tasks = 0, completed tasks = 1]
> 2017-11-11 11:20:56 UTC INFO  [pool-11-thread-402]
> gobblin.util.ExecutorsUtils  144 - Successfully shutdown ExecutorService:
> java.util.concurrent.ThreadPoolExecutor@6bce96a5[Terminated, pool size =
> 0,
> active threads = 0, queued tasks = 0, completed tasks = 1]
>
> ************************************************************
> *********************************************************************
>
> Error 2:
> ************************************************************
> *********************************************************************
>
> 2017-11-10 10:24:10 UTC WARN  [pool-11-thread-13]
> org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSInputChecker  154 -
> Problem opening checksum file:
> file:/home/Installable/gobblin-dist/working-dir/state-store/
> YoutubePageExtractorPull_138/current.jst.
> Ignoring exception:
> java.io.EOFException
>         at java.io.DataInputStream.readFully(DataInputStream.java:197)
>         at java.io.DataInputStream.readFully(DataInputStream.java:169)
>         at
> org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSInputChecker.<init>(
> ChecksumFileSystem.java:146)
>         at
> org.apache.hadoop.fs.ChecksumFileSystem.open(ChecksumFileSystem.java:339)
>         at
> org.apache.hadoop.io.SequenceFile$Reader.openFile(SequenceFile.java:1832)
>         at
> org.apache.hadoop.io.SequenceFile$Reader.<init>(SequenceFile.java:1752)
>         at
> org.apache.hadoop.io.SequenceFile$Reader.<init>(SequenceFile.java:1773)
>         at
> gobblin.runtime.FsDatasetStateStore.getAll(FsDatasetStateStore.java:119)
>         at
> gobblin.runtime.FsDatasetStateStore.getLatestDatasetStatesByUrns(
> FsDatasetStateStore.java:173)
>         at gobblin.runtime.JobContext.<init>(JobContext.java:136)
>         at
> gobblin.runtime.AbstractJobLauncher.<init>(AbstractJobLauncher.java:131)
>         at
> gobblin.runtime.local.LocalJobLauncher.<init>(LocalJobLauncher.java:62)
>         at
> gobblin.runtime.JobLauncherFactory.newJobLauncher(
> JobLauncherFactory.java:80)
>         at
> gobblin.runtime.JobLauncherFactory.newJobLauncher(
> JobLauncherFactory.java:59)
>         at com.bph.JobLauncherResource.search(JobLauncherResource.
> java:107)
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:
> 62)
>         at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(
> DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:498)
>         at
> com.linkedin.restli.internal.server.RestLiMethodInvoker.
> doInvoke(RestLiMethodInvoker.java:186)
>         at
> com.linkedin.restli.internal.server.RestLiMethodInvoker.
> invoke(RestLiMethodInvoker.java:141)
>         at
> com.linkedin.restli.server.RestLiServer.handleResourceRequest(
> RestLiServer.java:286)
>         at
> com.linkedin.restli.server.RestLiServer.doHandleRequest(
> RestLiServer.java:167)
>         at
> com.linkedin.restli.server.BaseRestServer.handleRequest(
> BaseRestServer.java:56)
>         at
> com.linkedin.restli.server.DelegatingTransportDispatcher.
> handleRestRequest(DelegatingTransportDispatcher.java:56)
>         at
> com.linkedin.r2.filter.transport.DispatcherRequestFilter.onRestRequest(
> DispatcherRequestFilter.java:81)
>         at
> com.linkedin.r2.filter.FilterChainImpl$RestRequestFilterAdapter.
> onRequest(FilterChainImpl.java:328)
>         at
> com.linkedin.r2.filter.ComposedFilter.onRequest(ComposedFilter.java:55)
>         at
> com.linkedin.r2.filter.FilterChainIterator.onRequest(
> FilterChainIterator.java:50)
>         at
> com.linkedin.r2.filter.compression.ServerCompressionFilter.onRestRequest(
> ServerCompressionFilter.java:126)
>         at
> com.linkedin.r2.filter.FilterChainImpl$RestRequestFilterAdapter.
> onRequest(FilterChainImpl.java:328)
>         at
> com.linkedin.r2.filter.ComposedFilter.onRequest(ComposedFilter.java:55)
>         at
> com.linkedin.r2.filter.FilterChainIterator.onRequest(
> FilterChainIterator.java:50)
>         at
> com.linkedin.r2.filter.ComposedFilter.onRequest(ComposedFilter.java:59)
>         at
> com.linkedin.r2.filter.FilterChainIterator.onRequest(
> FilterChainIterator.java:50)
>         at
> com.linkedin.r2.filter.FilterChainImpl.onRestRequest(
> FilterChainImpl.java:103)
>         at
> com.linkedin.r2.filter.transport.FilterChainDispatcher.handleRestRequest(
> FilterChainDispatcher.java:74)
>         at
> com.linkedin.r2.transport.http.server.HttpDispatcher.
> handleRequest(HttpDispatcher.java:95)
>         at
> com.linkedin.r2.transport.http.server.HttpDispatcher.
> handleRequest(HttpDispatcher.java:62)
>         at
> com.linkedin.r2.transport.http.server.HttpNettyServer$
> Handler.messageReceived(HttpNettyServer.java:171)
>         at
> org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(
> SimpleChannelUpstreamHandler.java:80)
>         at
> org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(
> DefaultChannelPipeline.java:545)
>         at
> org.jboss.netty.channel.DefaultChannelPipeline$
> DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:754)
>         at
> org.jboss.netty.handler.execution.ChannelEventRunnable.run(
> ChannelEventRunnable.java:69)
>         at
> org.jboss.netty.handler.execution.OrderedMemoryAwareThreadPoolEx
> ecutor$ChildExecutor.run(OrderedMemoryAwareThreadPoolExecutor.java:316)
>         at
> java.util.concurrent.ThreadPoolExecutor.runWorker(
> ThreadPoolExecutor.java:1142)
>         at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(
> ThreadPoolExecutor.java:617)
>         at java.lang.Thread.run(Thread.java:745)
> 2017-11-10 10:24:11 UTC ERROR [pool-11-thread-13]
> com.bph.JobLauncherResource  110 -  Job Id yt_138 failed while searching
> key ostfold Failed to create job launcher: java.io.EOFException
>
> ************************************************************
> *********************************************************************
>
> Error 3
> ************************************************************
> *********************************************************************
> 2017-11-10 13:38:49 UTC ERROR [Commit-thread-0]
> gobblin.runtime.SafeDatasetCommit  118 - Failed to persist dataset state
> for dataset  of job job_TwitterPageExtractorPull_135_1510321111647
> java.io.FileNotFoundException: Failed to rename
> /home/Installable/gobblin-dist/working-dir/state-store/
> TwitterPageExtractorPull_135/_tmp_/current.jst
> to
> /home/Installable/gobblin-dist/working-dir/state-store/
> TwitterPageExtractorPull_135/current.jst:
> src not found
> at gobblin.util.HadoopUtils.renamePath(HadoopUtils.java:173)
> at gobblin.util.HadoopUtils.renamePath(HadoopUtils.java:164)
> at gobblin.util.HadoopUtils.copyFile(HadoopUtils.java:333)
> at gobblin.metastore.FsStateStore.createAlias(FsStateStore.java:283)
> at
> gobblin.runtime.FsDatasetStateStore.persistDatasetState(
> FsDatasetStateStore.java:221)
> at
> gobblin.runtime.SafeDatasetCommit.persistDatasetState(
> SafeDatasetCommit.java:255)
> at gobblin.runtime.SafeDatasetCommit.call(SafeDatasetCommit.java:115)
> at gobblin.runtime.SafeDatasetCommit.call(SafeDatasetCommit.java:43)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(
> ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(
> ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> ************************************************************
> *********************************************************************
>
> There errors are seeing during the stress tests for the same Jobs. For our
> use case we can't afford the jobs to fail due to system issue like above. I
> did some basic investigation and could find the issue could be happening to
> to non atomic operations on the state file which is of extension .jst. It
> seems we could disable the statestore, I looked at the following code in
> gobblin.runtime.JobContext::createStateStore
> ************************************************************
> *********************************************************************
> if (jobProps.containsKey(ConfigurationKeys.STATE_STORE_ENABLED) &&
>
> !Boolean.parseBoolean(jobProps.getProperty(ConfigurationKeys.STATE_STORE_
> ENABLED)))
> {
>       return new NoopDatasetStateStore(stateStoreFs, stateStoreRootDir);
>     } else {
>       return new FsDatasetStateStore(stateStoreFs, stateStoreRootDir);
>     }
> ************************************************************
> *********************************************************************
>
> It seems that by disabling the statestore we may get over this issue, but
> for our case the source implementation is passing the information to the
> Extractor via the WorkUnitStoreState.
>
>
> We don't want the Job Retry features and hence did disable it as explained
> here
> https://gobblin.readthedocs.io/en/latest/user-guide/
> Configuration-Properties-Glossary/#retry-properties
>
> I was expecting the disabling happening by setting the follwing only
> workunit.retry.enabled=false
> we have to set this also
> task.maxretries=0
> As we don't rely on retries would it not be good to have a flag what will
> ignore the the following calls when we have have
> workunit.retry.enabled=false
>
> 1) Reading the initial value from the store
> 2) Commit the final state to the store.
>
> As mentioned about we can't disable the state store as we are generating
> some data in the Source implementation and passed to the corresponding
> Extractor implementation via State.
>
> I do anticipate of having these issues in GAAS too.
>
> I will be working to fix this issue as this is a critical issue for us.
>
> Thanks,
> Vicky
>

Mime
View raw message