gobblin-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Hung Tran <hut...@linkedin.com>
Subject Re: Corrupted state file when Jobs are being run in parallel.
Date Sat, 11 Nov 2017 22:02:19 GMT
Hi Vicky,


What's the underlying file system you are using?


Have you checked to make sure there are not multiple instances of the job running concurrently
or make use of the job lock?


Also, the state store and state are separate things. You can still pass state from the source
to the extractor without the state store enabled. The state store is used to transfer state
across executions, like cases where a watermark is used to resume an incremental pull.


Hung.

________________________________
From: Vicky Kak <vicky.kak@gmail.com>
Sent: Saturday, November 11, 2017 5:58:59 AM
To: user@gobblin.incubator.apache.org; dev@gobblin.incubator.apache.org
Subject: Corrupted state file when Jobs are being run in parallel.

Hi Guys,

I have been running the stress tests and am seeing the following errors

Error 1
*********************************************************************************************************************************
017-11-11 11:20:56 UTC INFO  [pool-11-thread-421]
org.apache.hadoop.fs.FSInputChecker  284 - Found checksum error: b[0,
512]=53455106196f72672e6170616368652e6861646f6f702e696f2e5465787425676f62626c696e2e72756e74696d652e4a6f6253746174652444617461736574537461746501012a6f72672e6170616368652e6861646f6f702e696f2e636f6d70726573732e44656661756c74436f6465630000000044e218b9e6aad3f1aa97f2210fb5c7f0ffffffff44e218b9e6aad3f1aa97f2210fb5c7f00109789c6304000002000209789c630000000100010b789cebb3d50200025100f68e0ab4789ced5b7b73dbc611971c3b8d5ff233b6d324ad861337e9d804013e4451692643d1924c51a26489962da71ece013890270238f870904479fc1592ffdb4fd1e9f4b364a6dfa3ff770f0fbe244384eca6c998d2f081bbddc5deede2f6777bcbcf974da275d8266ae1a543ce90c629dbf44cb3a9e48ae93daa3663fa9b4a419195ac5c2a147373a5a9a9e9e6df4adf3c0a3ee7ff39e5ff5da7172b1bebebd54663097aa6a6c52b995c9923b7333e79530e15f93954e41f8122d7fe0d6f8f6fe0805bb291855d0769f8aee14b961c102da17d4625576b630b5d7ae561d6954c64b7ce75d81742098639b4f036c348772835250b1dbae4084f672fba1c1a2d89e85f159031870d944fe7545d4be70b46313d5f9071ba24e772459445322aea331479bc2df96f1e33bf6d73eeb80b998c4d74506c1f3349a356c627ca4a72467c520637fa9e
org.apache.hadoop.fs.ChecksumException: Checksum error:
file:/home/Installable/gobblin-dist/working-dir/state-store/FlickrPageExtractorPull_137/current.jst
at 0 exp: 36820587 got: 91149211
        at
org.apache.hadoop.fs.FSInputChecker.verifySums(FSInputChecker.java:322)
        at
org.apache.hadoop.fs.FSInputChecker.readChecksumChunk(FSInputChecker.java:278)
        at org.apache.hadoop.fs.FSInputChecker.fill(FSInputChecker.java:213)
        at
org.apache.hadoop.fs.FSInputChecker.read1(FSInputChecker.java:231)
        at org.apache.hadoop.fs.FSInputChecker.read(FSInputChecker.java:195)
        at java.io.DataInputStream.readFully(DataInputStream.java:195)
        at java.io.DataInputStream.readFully(DataInputStream.java:169)
        at
org.apache.hadoop.io.SequenceFile$Reader.init(SequenceFile.java:1845)
        at
org.apache.hadoop.io.SequenceFile$Reader.initialize(SequenceFile.java:1810)
        at
org.apache.hadoop.io.SequenceFile$Reader.<init>(SequenceFile.java:1759)
        at
org.apache.hadoop.io.SequenceFile$Reader.<init>(SequenceFile.java:1773)
        at
gobblin.runtime.FsDatasetStateStore.getAll(FsDatasetStateStore.java:119)
        at
gobblin.runtime.FsDatasetStateStore.getLatestDatasetStatesByUrns(FsDatasetStateStore.java:173)
        at gobblin.runtime.JobContext.<init>(JobContext.java:136)
        at
gobblin.runtime.AbstractJobLauncher.<init>(AbstractJobLauncher.java:131)
        at
gobblin.runtime.local.LocalJobLauncher.<init>(LocalJobLauncher.java:62)
        at
gobblin.runtime.JobLauncherFactory.newJobLauncher(JobLauncherFactory.java:80)
        at
gobblin.runtime.JobLauncherFactory.newJobLauncher(JobLauncherFactory.java:59)
        at com.bph.JobLauncherResource.search(JobLauncherResource.java:107)
        at sun.reflect.GeneratedMethodAccessor8.invoke(Unknown Source)
        at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at
com.linkedin.restli.internal.server.RestLiMethodInvoker.doInvoke(RestLiMethodInvoker.java:186)
        at
com.linkedin.restli.internal.server.RestLiMethodInvoker.invoke(RestLiMethodInvoker.java:141)
        at
com.linkedin.restli.server.RestLiServer.handleResourceRequest(RestLiServer.java:286)
        at
com.linkedin.restli.server.RestLiServer.doHandleRequest(RestLiServer.java:167)
        at
com.linkedin.restli.server.BaseRestServer.handleRequest(BaseRestServer.java:56)
        at
com.linkedin.restli.server.DelegatingTransportDispatcher.handleRestRequest(DelegatingTransportDispatcher.java:56)
        at
com.linkedin.r2.filter.transport.DispatcherRequestFilter.onRestRequest(DispatcherRequestFilter.java:81)
        at
com.linkedin.r2.filter.FilterChainImpl$RestRequestFilterAdapter.onRequest(FilterChainImpl.java:328)
        at
com.linkedin.r2.filter.ComposedFilter.onRequest(ComposedFilter.java:55)
        at
com.linkedin.r2.filter.FilterChainIterator.onRequest(FilterChainIterator.java:50)
        at
com.linkedin.r2.filter.compression.ServerCompressionFilter.onRestRequest(ServerCompressionFilter.java:126)
        at
com.linkedin.r2.filter.FilterChainImpl$RestRequestFilterAdapter.onRequest(FilterChainImpl.java:328)
        at
com.linkedin.r2.filter.ComposedFilter.onRequest(ComposedFilter.java:55)
        at
com.linkedin.r2.filter.FilterChainIterator.onRequest(FilterChainIterator.java:50)
        at
com.linkedin.r2.filter.ComposedFilter.onRequest(ComposedFilter.java:59)
        at
com.linkedin.r2.filter.FilterChainIterator.onRequest(FilterChainIterator.java:50)
        at
com.linkedin.r2.filter.FilterChainImpl.onRestRequest(FilterChainImpl.java:103)
        at
com.linkedin.r2.filter.transport.FilterChainDispatcher.handleRestRequest(FilterChainDispatcher.java:74)
        at
com.linkedin.r2.transport.http.server.HttpDispatcher.handleRequest(HttpDispatcher.java:95)
        at
com.linkedin.r2.transport.http.server.HttpDispatcher.handleRequest(HttpDispatcher.java:62)
        at
com.linkedin.r2.transport.http.server.HttpNettyServer$Handler.messageReceived(HttpNettyServer.java:171)
        at
org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:80)
        at
org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:545)
        at
org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:754)
        at
org.jboss.netty.handler.execution.ChannelEventRunnable.run(ChannelEventRunnable.java:69)
        at
org.jboss.netty.handler.execution.OrderedMemoryAwareThreadPoolExecutor$ChildExecutor.run(OrderedMemoryAwareThreadPoolExecutor.java:316)
        at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
2017-11-11 11:20:56 UTC ERROR [pool-11-thread-421]
com.bph.JobLauncherResource  110 -  Job Id fk_137 failed while searching
key beryls Failed to create job launcher:
org.apache.hadoop.fs.ChecksumException: Checksum error:
file:/home/Installable/gobblin-dist/working-dir/state-store/FlickrPageExtractorPull_137/current.jst
at 0 exp: 36820587 got: 91149211
2017-11-11 11:20:56 UTC INFO  [pool-11-thread-402]
gobblin.util.ExecutorsUtils  125 - Attempting to shutdown ExecutorService:
java.util.concurrent.ThreadPoolExecutor@6bce96a5[Shutting down, pool size =
1, active threads = 0, queued tasks = 0, completed tasks = 1]
2017-11-11 11:20:56 UTC INFO  [pool-11-thread-402]
gobblin.util.ExecutorsUtils  144 - Successfully shutdown ExecutorService:
java.util.concurrent.ThreadPoolExecutor@6bce96a5[Terminated, pool size = 0,
active threads = 0, queued tasks = 0, completed tasks = 1]

*********************************************************************************************************************************

Error 2:
*********************************************************************************************************************************

2017-11-10 10:24:10 UTC WARN  [pool-11-thread-13]
org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSInputChecker  154 -
Problem opening checksum file:
file:/home/Installable/gobblin-dist/working-dir/state-store/YoutubePageExtractorPull_138/current.jst.
Ignoring exception:
java.io.EOFException
        at java.io.DataInputStream.readFully(DataInputStream.java:197)
        at java.io.DataInputStream.readFully(DataInputStream.java:169)
        at
org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSInputChecker.<init>(ChecksumFileSystem.java:146)
        at
org.apache.hadoop.fs.ChecksumFileSystem.open(ChecksumFileSystem.java:339)
        at
org.apache.hadoop.io.SequenceFile$Reader.openFile(SequenceFile.java:1832)
        at
org.apache.hadoop.io.SequenceFile$Reader.<init>(SequenceFile.java:1752)
        at
org.apache.hadoop.io.SequenceFile$Reader.<init>(SequenceFile.java:1773)
        at
gobblin.runtime.FsDatasetStateStore.getAll(FsDatasetStateStore.java:119)
        at
gobblin.runtime.FsDatasetStateStore.getLatestDatasetStatesByUrns(FsDatasetStateStore.java:173)
        at gobblin.runtime.JobContext.<init>(JobContext.java:136)
        at
gobblin.runtime.AbstractJobLauncher.<init>(AbstractJobLauncher.java:131)
        at
gobblin.runtime.local.LocalJobLauncher.<init>(LocalJobLauncher.java:62)
        at
gobblin.runtime.JobLauncherFactory.newJobLauncher(JobLauncherFactory.java:80)
        at
gobblin.runtime.JobLauncherFactory.newJobLauncher(JobLauncherFactory.java:59)
        at com.bph.JobLauncherResource.search(JobLauncherResource.java:107)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at
com.linkedin.restli.internal.server.RestLiMethodInvoker.doInvoke(RestLiMethodInvoker.java:186)
        at
com.linkedin.restli.internal.server.RestLiMethodInvoker.invoke(RestLiMethodInvoker.java:141)
        at
com.linkedin.restli.server.RestLiServer.handleResourceRequest(RestLiServer.java:286)
        at
com.linkedin.restli.server.RestLiServer.doHandleRequest(RestLiServer.java:167)
        at
com.linkedin.restli.server.BaseRestServer.handleRequest(BaseRestServer.java:56)
        at
com.linkedin.restli.server.DelegatingTransportDispatcher.handleRestRequest(DelegatingTransportDispatcher.java:56)
        at
com.linkedin.r2.filter.transport.DispatcherRequestFilter.onRestRequest(DispatcherRequestFilter.java:81)
        at
com.linkedin.r2.filter.FilterChainImpl$RestRequestFilterAdapter.onRequest(FilterChainImpl.java:328)
        at
com.linkedin.r2.filter.ComposedFilter.onRequest(ComposedFilter.java:55)
        at
com.linkedin.r2.filter.FilterChainIterator.onRequest(FilterChainIterator.java:50)
        at
com.linkedin.r2.filter.compression.ServerCompressionFilter.onRestRequest(ServerCompressionFilter.java:126)
        at
com.linkedin.r2.filter.FilterChainImpl$RestRequestFilterAdapter.onRequest(FilterChainImpl.java:328)
        at
com.linkedin.r2.filter.ComposedFilter.onRequest(ComposedFilter.java:55)
        at
com.linkedin.r2.filter.FilterChainIterator.onRequest(FilterChainIterator.java:50)
        at
com.linkedin.r2.filter.ComposedFilter.onRequest(ComposedFilter.java:59)
        at
com.linkedin.r2.filter.FilterChainIterator.onRequest(FilterChainIterator.java:50)
        at
com.linkedin.r2.filter.FilterChainImpl.onRestRequest(FilterChainImpl.java:103)
        at
com.linkedin.r2.filter.transport.FilterChainDispatcher.handleRestRequest(FilterChainDispatcher.java:74)
        at
com.linkedin.r2.transport.http.server.HttpDispatcher.handleRequest(HttpDispatcher.java:95)
        at
com.linkedin.r2.transport.http.server.HttpDispatcher.handleRequest(HttpDispatcher.java:62)
        at
com.linkedin.r2.transport.http.server.HttpNettyServer$Handler.messageReceived(HttpNettyServer.java:171)
        at
org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:80)
        at
org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:545)
        at
org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:754)
        at
org.jboss.netty.handler.execution.ChannelEventRunnable.run(ChannelEventRunnable.java:69)
        at
org.jboss.netty.handler.execution.OrderedMemoryAwareThreadPoolExecutor$ChildExecutor.run(OrderedMemoryAwareThreadPoolExecutor.java:316)
        at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
2017-11-10 10:24:11 UTC ERROR [pool-11-thread-13]
com.bph.JobLauncherResource  110 -  Job Id yt_138 failed while searching
key ostfold Failed to create job launcher: java.io.EOFException

*********************************************************************************************************************************

Error 3
*********************************************************************************************************************************
2017-11-10 13:38:49 UTC ERROR [Commit-thread-0]
gobblin.runtime.SafeDatasetCommit  118 - Failed to persist dataset state
for dataset  of job job_TwitterPageExtractorPull_135_1510321111647
java.io.FileNotFoundException: Failed to rename
/home/Installable/gobblin-dist/working-dir/state-store/TwitterPageExtractorPull_135/_tmp_/current.jst
to
/home/Installable/gobblin-dist/working-dir/state-store/TwitterPageExtractorPull_135/current.jst:
src not found
at gobblin.util.HadoopUtils.renamePath(HadoopUtils.java:173)
at gobblin.util.HadoopUtils.renamePath(HadoopUtils.java:164)
at gobblin.util.HadoopUtils.copyFile(HadoopUtils.java:333)
at gobblin.metastore.FsStateStore.createAlias(FsStateStore.java:283)
at
gobblin.runtime.FsDatasetStateStore.persistDatasetState(FsDatasetStateStore.java:221)
at
gobblin.runtime.SafeDatasetCommit.persistDatasetState(SafeDatasetCommit.java:255)
at gobblin.runtime.SafeDatasetCommit.call(SafeDatasetCommit.java:115)
at gobblin.runtime.SafeDatasetCommit.call(SafeDatasetCommit.java:43)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
*********************************************************************************************************************************

There errors are seeing during the stress tests for the same Jobs. For our
use case we can't afford the jobs to fail due to system issue like above. I
did some basic investigation and could find the issue could be happening to
to non atomic operations on the state file which is of extension .jst. It
seems we could disable the statestore, I looked at the following code in
gobblin.runtime.JobContext::createStateStore
*********************************************************************************************************************************
if (jobProps.containsKey(ConfigurationKeys.STATE_STORE_ENABLED) &&

!Boolean.parseBoolean(jobProps.getProperty(ConfigurationKeys.STATE_STORE_ENABLED)))
{
      return new NoopDatasetStateStore(stateStoreFs, stateStoreRootDir);
    } else {
      return new FsDatasetStateStore(stateStoreFs, stateStoreRootDir);
    }
*********************************************************************************************************************************

It seems that by disabling the statestore we may get over this issue, but
for our case the source implementation is passing the information to the
Extractor via the WorkUnitStoreState.


We don't want the Job Retry features and hence did disable it as explained
here
https://gobblin.readthedocs.io/en/latest/user-guide/Configuration-Properties-Glossary/#retry-properties

I was expecting the disabling happening by setting the follwing only
workunit.retry.enabled=false
we have to set this also
task.maxretries=0
As we don't rely on retries would it not be good to have a flag what will
ignore the the following calls when we have have
workunit.retry.enabled=false

1) Reading the initial value from the store
2) Commit the final state to the store.

As mentioned about we can't disable the state store as we are generating
some data in the Source implementation and passed to the corresponding
Extractor implementation via State.

I do anticipate of having these issues in GAAS too.

I will be working to fix this issue as this is a critical issue for us.

Thanks,
Vicky

Mime
View raw message