carbondata-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Crabo Yang (JIRA)" <j...@apache.org>
Subject [jira] [Comment Edited] (CARBONDATA-906) Always OOM error when import large dataset (100milion rows)
Date Wed, 12 Apr 2017 09:18:41 GMT

    [ https://issues.apache.org/jira/browse/CARBONDATA-906?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15965585#comment-15965585
] 

Crabo Yang edited comment on CARBONDATA-906 at 4/12/17 9:18 AM:
----------------------------------------------------------------

1.oozie spark-opts
<spark-opts>
--jars rds.importer-1.0-SNAPSHOT.jar,carbondata_2.10-1.0.0-incubating-shade-hadoop2.6.0-cdh5.7.0.jar

--num-executors 12 --executor-cores 4 --executor-memory 13G
--conf spark.yarn.executor.memoryOverhead=5120
--conf spark.executor.heartbeatInterval=10000000
--conf spark.network.timeout=10000000
</spark-opts>

2.create script 
CREATE TABLE IF NOT EXISTS dmp_trade(id STRING,buyerNick STRING,buyerAlipayNO STRING,clientType
STRING,sellerNick STRING,receiverName STRING,receiverMobile STRING,receiverPhone STRING,receiverCountry
STRING,receiverState STRING,receiverCity STRING,receiverDistrict STRING,receiverTown STRING,receiverAddress
STRING,receiverZip STRING,status STRING,tradeFrom STRING,type STRING,stepTradeStatus STRING,shippingType
STRING,title STRING,buyerMessage STRING,buyerMemo STRING,rxAuditStatus STRING,buyerEmail STRING,picPath
STRING,shopPick STRING,creditCardFee STRING,markDesc STRING,sellerMemo STRING,invoiceName
STRING,invoiceType STRING,tradeAttr STRING,esRange STRING,esDate STRING,osDate STRING,osRange
STRING,o2oSnatchStatus STRING,market STRING,etType STRING,obs STRING,tradeOriginalJson STRING,point
STRING,omniAttr STRING,omniParam STRING,identity STRING,omnichannelParam STRING,assembly STRING,tradeId
BIGINT,itemId BIGINT,platFormId INT,num INT,sellerFlag INT,naSource INT,etShopId INT,forbidConsign
INT,buyerFlag INT,topHold INT,nvoiceKind INT,payment STRING,price STRING,totalFee STRING,discountFee
STRING,postFee STRING,stepPaidFee STRING,adjustFee STRING,buyerCodFee STRING,orderTaxFee STRING,couponFee
STRING,paidCouponFee STRING,sellerRate STRING,buyerRate STRING,postGateDeclare STRING,crossBondedDeclare
STRING,hasBuyerMessage STRING,hasPostFee STRING,isShShip STRING,created TIMESTAMP,payTime
TIMESTAMP,modified TIMESTAMP,endTime TIMESTAMP,consignTime TIMESTAMP,estConTime TIMESTAMP)
STORED BY 'carbondata';

3.carbon.properties
#Mandatory. Carbon Store path
carbon.storelocation=hdfs://master.nascent.com:8020/Opt/CarbonStore
#Base directory for Data files
carbon.ddl.base.hdfs.url=hdfs://master.nascent.com:8020/opt/data
#Path where the bad records are stored
carbon.badRecords.location=/opt/Carbon/Spark/badrecords
#Mandatory. path to kettle home
carbon.kettle.home=/usr/lib/spark/carbonlib/carbonplugins

carbon.load.use.batch.sort=true
enable.unsafe.sort=true
offheap.sort.chunk.size.inmb=1024
carbon.load.batch.sort.size.inmb=450
#File read buffer size used during sorting(in MB) :MIN=1:MAX=100
carbon.sort.file.buffer.size=10
#Rowset size exchanged between data load graph steps :MIN=500:MAX=1000000
carbon.graph.rowset.size=10000
#Number of cores to be used while data loading
carbon.number.of.cores.while.loading=6
#Record count to sort and write to temp intermediate files
carbon.sort.size=500000
#Algorithm for hashmap for hashkey calculation
carbon.enableXXHash=true
#Number of cores to be used for block sort while dataloading
#carbon.number.of.cores.block.sort=7
#max level cache size upto which level cache will be loaded in memory
#carbon.max.level.cache.size=-1
#enable prefetch of data during merge sort while reading data from sort temp files in data
loading
#carbon.merge.sort.prefetch=true

#Number of cores to be used while compacting
carbon.number.of.cores.while.compacting=8
#For minor compaction, Number of segments to be merged in stage 1, number of compacted segments
to be merged in stage 2.
carbon.compaction.level.threshold=4,3
#default size (in MB) for major compaction to be triggered
carbon.major.compaction.size=1024
#Query Configuration
#Number of cores to be used for loading index into memory
carbon.number.of.cores=8
#Number of records to be in memory while querying :MIN=100000:MAX=240000
carbon.inmemory.record.size=120000
#Improves the performance of filter query
carbon.enable.quick.filter=false
##number of core to load the blocks in driver
#no.of.cores.to.load.blocks.in.driver=10

#Extra Configuration
##Timestamp format of input data used for timestamp data type.
#carbon.timestamp.format=yyyy-MM-dd HH:mm:ss

##File write buffer size used during sorting.
#carbon.sort.file.write.buffer.size=10485760
##Locking mechanism for data loading on a table
carbon.lock.type=HDFSLOCK
##Minimum no of intermediate files after which sort merged to be started.
#carbon.sort.intermediate.files.limit=20
##space reserved in percentage for writing block meta data in carbon data file
#carbon.block.meta.size.reserved.percentage=10
##csv reading buffer size.
#carbon.csv.read.buffersize.byte=1048576
##To identify and apply compression for non-high cardinality columns
#high.cardinality.value=100000
##maximum no of threads used for reading intermediate files for final merging.
#carbon.merge.sort.reader.thread=3
##Carbon blocklet size. Note: this configuration cannot be change once store is generated
#carbon.blocklet.size=120000
##number of retries to get the metadata lock for loading data to table
#carbon.load.metadata.lock.retries=3
##Minimum blocklets needed for distribution.
#carbon.blockletdistribution.min.blocklet.size=10
##Interval between the retries to get the lock
#carbon.load.metadata.lock.retry.timeout.sec=5
##Temporary store location, By default it will take System.getProperty("java.io.tmpdir")
#carbon.tempstore.location=/opt/Carbon/TempStoreLoc
##data loading records count logger
#carbon.load.log.counter=500000

##to specify number of segments to be preserved from compaction
#carbon.numberof.preserve.segments=0
##To determine the loads of number of days to be compacted
#carbon.allowed.compaction.days=0
##To enable compaction while data loading
#carbon.enable.auto.load.merge=false

##Maximum time allowed for one query to be executed.
max.query.execution.time=60
##Min max is feature added to enhance query performance. To disable this feature, make it
false.
carbon.enableMinMax=true

##To enable/disable identify high cardinality during first data loading
#high.cardinality.identify.enable=true
##threshold to identify whether high cardinality column
#high.cardinality.threshold=1000000
##Percentage to identify whether column cardinality is more than configured percent of total
row count
#high.cardinality.row.count.percentage=80
##The property to set the date to be considered as start date for calculating the timestamp.
#carbon.cutOffTimestamp=2000-01-01 00:00:00
##The property to set the timestamp (ie milis) conversion to the SECOND, MINUTE, HOUR or DAY
level.
#carbon.timegranularity=SECOND


was (Author: crabo):
1.oozie spark-opts
<spark-opts>
--jars rds.importer-1.0-SNAPSHOT.jar,carbondata_2.10-1.0.0-incubating-shade-hadoop2.6.0-cdh5.7.0.jar

--num-executors 12 --executor-cores 4 --executor-memory 13G
--conf spark.yarn.executor.memoryOverhead=5120
--conf spark.executor.heartbeatInterval=10000000
--conf spark.network.timeout=10000000
</spark-opts>

2.create script 
CREATE TABLE IF NOT EXISTS dmp_trade(id STRING,buyerNick STRING,buyerAlipayNO STRING,clientType
STRING,sellerNick STRING,receiverName STRING,receiverMobile STRING,receiverPhone STRING,receiverCountry
STRING,receiverState STRING,receiverCity STRING,receiverDistrict STRING,receiverTown STRING,receiverAddress
STRING,receiverZip STRING,status STRING,tradeFrom STRING,type STRING,stepTradeStatus STRING,shippingType
STRING,title STRING,buyerMessage STRING,buyerMemo STRING,rxAuditStatus STRING,buyerEmail STRING,picPath
STRING,shopPick STRING,creditCardFee STRING,markDesc STRING,sellerMemo STRING,invoiceName
STRING,invoiceType STRING,tradeAttr STRING,esRange STRING,esDate STRING,osDate STRING,osRange
STRING,o2oSnatchStatus STRING,market STRING,etType STRING,obs STRING,tradeOriginalJson STRING,point
STRING,omniAttr STRING,omniParam STRING,identity STRING,omnichannelParam STRING,assembly STRING,tradeId
BIGINT,itemId BIGINT,platFormId INT,num INT,sellerFlag INT,naSource INT,etShopId INT,forbidConsign
INT,buyerFlag INT,topHold INT,nvoiceKind INT,payment STRING,price STRING,totalFee STRING,discountFee
STRING,postFee STRING,stepPaidFee STRING,adjustFee STRING,buyerCodFee STRING,orderTaxFee STRING,couponFee
STRING,paidCouponFee STRING,sellerRate STRING,buyerRate STRING,postGateDeclare STRING,crossBondedDeclare
STRING,hasBuyerMessage STRING,hasPostFee STRING,isShShip STRING,created TIMESTAMP,payTime
TIMESTAMP,modified TIMESTAMP,endTime TIMESTAMP,consignTime TIMESTAMP,estConTime TIMESTAMP)
STORED BY 'carbondata';

3.carbon.properties
#Mandatory. Carbon Store path
carbon.storelocation=hdfs://master.nascent.com:8020/Opt/CarbonStore
#Base directory for Data files
carbon.ddl.base.hdfs.url=hdfs://master.nascent.com:8020/opt/data
#Path where the bad records are stored
carbon.badRecords.location=/opt/Carbon/Spark/badrecords
#Mandatory. path to kettle home
carbon.kettle.home=/usr/lib/spark/carbonlib/carbonplugins

carbon.load.use.batch.sort=true
enable.unsafe.sort=true
offheap.sort.chunk.size.inmb=1024
carbon.load.batch.sort.size.inmb=450
#File read buffer size used during sorting(in MB) :MIN=1:MAX=100
carbon.sort.file.buffer.size=10
#Rowset size exchanged between data load graph steps :MIN=500:MAX=1000000
carbon.graph.rowset.size=10000
#Number of cores to be used while data loading
carbon.number.of.cores.while.loading=6
#Record count to sort and write to temp intermediate files
carbon.sort.size=500000
#Algorithm for hashmap for hashkey calculation
carbon.enableXXHash=true
#Number of cores to be used for block sort while dataloading
#carbon.number.of.cores.block.sort=7
#max level cache size upto which level cache will be loaded in memory
#carbon.max.level.cache.size=-1
#enable prefetch of data during merge sort while reading data from sort temp files in data
loading
#carbon.merge.sort.prefetch=true

#Number of cores to be used while compacting
carbon.number.of.cores.while.compacting=8
#For minor compaction, Number of segments to be merged in stage 1, number of compacted segments
to be merged in stage 2.
carbon.compaction.level.threshold=4,3
#default size (in MB) for major compaction to be triggered
carbon.major.compaction.size=1024
#Query Configuration
#Number of cores to be used for loading index into memory
carbon.number.of.cores=8
#Number of records to be in memory while querying :MIN=100000:MAX=240000
carbon.inmemory.record.size=120000
#Improves the performance of filter query
carbon.enable.quick.filter=false
##number of core to load the blocks in driver
#no.of.cores.to.load.blocks.in.driver=10

#Extra Configuration
##Timestamp format of input data used for timestamp data type.
#carbon.timestamp.format=yyyy-MM-dd HH:mm:ss

##File write buffer size used during sorting.
#carbon.sort.file.write.buffer.size=10485760
##Locking mechanism for data loading on a table
carbon.lock.type=HDFSLOCK
##Minimum no of intermediate files after which sort merged to be started.
#carbon.sort.intermediate.files.limit=20
##space reserved in percentage for writing block meta data in carbon data file
#carbon.block.meta.size.reserved.percentage=10
##csv reading buffer size.
#carbon.csv.read.buffersize.byte=1048576
##To identify and apply compression for non-high cardinality columns
#high.cardinality.value=100000
##maximum no of threads used for reading intermediate files for final merging.
#carbon.merge.sort.reader.thread=3
##Carbon blocklet size. Note: this configuration cannot be change once store is generated
#carbon.blocklet.size=120000
##number of retries to get the metadata lock for loading data to table
#carbon.load.metadata.lock.retries=3
##Minimum blocklets needed for distribution.
#carbon.blockletdistribution.min.blocklet.size=10
##Interval between the retries to get the lock
#carbon.load.metadata.lock.retry.timeout.sec=5
##Temporary store location, By default it will take System.getProperty("java.io.tmpdir")
#carbon.tempstore.location=/opt/Carbon/TempStoreLoc
##data loading records count logger
#carbon.load.log.counter=500000

##to specify number of segments to be preserved from compaction
#carbon.numberof.preserve.segments=0
##To determine the loads of number of days to be compacted
#carbon.allowed.compaction.days=0
##To enable compaction while data loading
#carbon.enable.auto.load.merge=false
######## Query Configuration ########
##Maximum time allowed for one query to be executed.
max.query.execution.time=60
##Min max is feature added to enhance query performance. To disable this feature, make it
false.
carbon.enableMinMax=true

##To enable/disable identify high cardinality during first data loading
#high.cardinality.identify.enable=true
##threshold to identify whether high cardinality column
#high.cardinality.threshold=1000000
##Percentage to identify whether column cardinality is more than configured percent of total
row count
#high.cardinality.row.count.percentage=80
##The property to set the date to be considered as start date for calculating the timestamp.
#carbon.cutOffTimestamp=2000-01-01 00:00:00
##The property to set the timestamp (ie milis) conversion to the SECOND, MINUTE, HOUR or DAY
level.
#carbon.timegranularity=SECOND

> Always OOM error when import large dataset (100milion rows)
> -----------------------------------------------------------
>
>                 Key: CARBONDATA-906
>                 URL: https://issues.apache.org/jira/browse/CARBONDATA-906
>             Project: CarbonData
>          Issue Type: Bug
>          Components: data-load
>    Affects Versions: 1.0.0-incubating
>            Reporter: Crabo Yang
>         Attachments: carbon.properties
>
>
> java.lang.OutOfMemoryError: GC overhead limit exceeded
> 	at java.util.concurrent.ConcurrentHashMap$Segment.put(ConcurrentHashMap.java:457)
> 	at java.util.concurrent.ConcurrentHashMap.put(ConcurrentHashMap.java:1130)
> 	at org.apache.carbondata.core.cache.dictionary.ColumnReverseDictionaryInfo.addDataToDictionaryMap(ColumnReverseDictionaryInfo.java:101)
> 	at org.apache.carbondata.core.cache.dictionary.ColumnReverseDictionaryInfo.addDictionaryChunk(ColumnReverseDictionaryInfo.java:88)
> 	at org.apache.carbondata.core.cache.dictionary.DictionaryCacheLoaderImpl.fillDictionaryValuesAndAddToDictionaryChunks(DictionaryCacheLoaderImpl.java:113)
> 	at org.apache.carbondata.core.cache.dictionary.DictionaryCacheLoaderImpl.load(DictionaryCacheLoaderImpl.java:81)
> 	at org.apache.carbondata.core.cache.dictionary.AbstractDictionaryCache.loadDictionaryData(AbstractDictionaryCache.java:236)
> 	at org.apache.carbondata.core.cache.dictionary.AbstractDictionaryCache.checkAndLoadDictionaryData(AbstractDictionaryCache.java:186)
> 	at org.apache.carbondata.core.cache.dictionary.ReverseDictionaryCache.getDictionary(ReverseDictionaryCache.java:174)
> 	at org.apache.carbondata.core.cache.dictionary.ReverseDictionaryCache.get(ReverseDictionaryCache.java:67)
> 	at org.apache.carbondata.core.cache.dictionary.ReverseDictionaryCache.get(ReverseDictionaryCache.java:38)
> 	at org.apache.carbondata.processing.newflow.converter.impl.DictionaryFieldConverterImpl.<init>(DictionaryFieldConverterImpl.java:92)
> 	at org.apache.carbondata.processing.newflow.converter.impl.FieldEncoderFactory.createFieldEncoder(FieldEncoderFactory.java:77)
> 	at org.apache.carbondata.processing.newflow.converter.impl.RowConverterImpl.initialize(RowConverterImpl.java:102)
> 	at org.apache.carbondata.processing.newflow.steps.DataConverterProcessorStepImpl.initialize(DataConverterProcessorStepImpl.java:69)
> 	at org.apache.carbondata.processing.newflow.steps.SortProcessorStepImpl.initialize(SortProcessorStepImpl.java:57)
> 	at org.apache.carbondata.processing.newflow.steps.DataWriterProcessorStepImpl.initialize(DataWriterProcessorStepImpl.java:79)
> 	at org.apache.carbondata.processing.newflow.DataLoadExecutor.execute(DataLoadExecutor.java:45)
> 	at org.apache.carbondata.spark.rdd.NewDataFrameLoaderRDD$$anon$2.<init>(NewCarbonDataLoadRDD.scala:425)
> 	at org.apache.carbondata.spark.rdd.NewDataFrameLoaderRDD.compute(NewCarbonDataLoadRDD.scala:383)
> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
> 	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
> 	at org.apache.spark.scheduler.Task.run(Task.scala:89)
> 	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> 	at java.lang.Thread.run(Thread.java:745)



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Mime
View raw message