flink-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Nam-Luc Tran <namluc.t...@euranova.eu>
Subject Re: kryoException : Buffer underflow
Date Wed, 11 Feb 2015 16:38:42 GMT
Hello Stephan, 

Thank you for your help.

I ensured all the POJO classes used comply to what you previously said
and the same exception occurs. Here is the listing of classes
Centroid25 and Point25:

public class Centroid25 extends Point25 {

public int id;

public Centroid25() {}

public Centroid25(int id, Double value0, Double value1, Double value2,
Double value3, Double value4, Double value5,
Double value6, Double value7, Double value8, Double value9, Double
value10, Double value11, Double value12,
Double value13, Double value14, Double value15, Double value16, Double
value17, Double value18,
Double value19, Double value20, Double value21, Double value22, Double
value23, Double value24) {
super(value0, value1, value2, value3, value4, value5, value6, value7,
value8, value9, value10, value11,
value12, value13, value14, value15, value16, value17, value18,
value19, value20, value21, value22,
value23, value24);
this.id = id;
}

public Centroid25(int id, Point25 p) {
super(p.f0,
p.f1,p.f2,p.f3,p.f4,p.f5,p.f6,p.f7,p.f8,p.f9,p.f10,p.f11,p.f12,p.f13,p.f14,p.f15,p.f16,p.f17,p.f18,p.f19,p.f20,p.f21,p.f22,p.f23,p.f24);
this.id = id;
}

public Centroid25(int id, Tuple25 p) {
super(p.f0,
p.f1,p.f2,p.f3,p.f4,p.f5,p.f6,p.f7,p.f8,p.f9,p.f10,p.f11,p.f12,p.f13,p.f14,p.f15,p.f16,p.f17,p.f18,p.f19,p.f20,p.f21,p.f22,p.f23,p.f24);
this.id = id;
}

@Override
public String toString() {
return id + " " + super.toString();
}
}

public class Point25{

public Double
f0,f1,f2,f3,f4,f5,f6,f7,f8,f9,f10,f11,f12,f13,f14,f15,f16,f17,f18,f19,f20,f21,f22,f23,f24
= 0.0;

public Point25() {
}

public Point25(Double value0, Double value1, Double value2, Double
value3, Double value4, Double value5,
Double value6, Double value7, Double value8, Double value9, Double
value10, Double value11, Double value12,
Double value13, Double value14, Double value15, Double value16, Double
value17, Double value18,
Double value19, Double value20, Double value21, Double value22, Double
value23, Double value24) {
f0=value0;
f1=value1;
f2=value2;
f3=value3;
f4=value4;
f5=value5;
f6=value6;
f7=value7;
f8=value8;
f9=value9;
f10=value10;
f11=value11;
f12=value12;
f13=value13;
f14=value14;
f15=value15;
f16=value16;
f17=value17;
f18=value18;
f19=value19;
f20=value20;
f21=value21;
f22=value22;
f23=value23;
f24=value24;

}

public List getFieldsAsList() {
return Arrays.asList(f0, f1, f2, f3, f4, f5, f6, f7, f8, f9, f10, f11,
f12, f13, f14, f15, f16, f17, f18, f19,
f20, f21, f22, f23, f24);
}

public Point25 add(Point25 other) {
f0 += other.f0;
f1 += other.f1;
f2 += other.f2;
f3 += other.f3;
f4 += other.f4;
f5 += other.f5;
f6 += other.f6;
f7 += other.f7;
f8 += other.f8;
f9 += other.f9;
f10 += other.f10;
f11 += other.f11;
f12 += other.f12;
f13 += other.f13;
f14 += other.f14;
f15 += other.f15;
f16 += other.f16;
f17 += other.f17;
f18 += other.f18;
f19 += other.f19;
f20 += other.f20;
f21 += other.f21;
f22 += other.f22;
f23 += other.f23;
f24 += other.f24;
return this;
}

public Point25 div(long val) {
f0 /= val;
f1 /= val;
f2 /= val;
f3 /= val;
f4 /= val;
f5 += val;
f6 += val;
f7 += val;
f8 += val;
f9 += val;
f10 += val;
f11 += val;
f12 += val;
f13 += val;
f14 += val;
f15 += val;
f16 += val;
f17 += val;
f18 += val;
f19 += val;
f20 += val;
f21 += val;
f22 += val;
f23 += val;
f24 += val;
return this;
}

public double euclideanDistance(Point25 other) {
List l = this.getFieldsAsList();
List ol = other.getFieldsAsList();
double res = 0;
for(int i=0;i
> I came accross an error for which I am unable to retrace the exact
cause.
> Starting from flink-java-examples module, I have extended the KMeans
> example
> to a case where points have 25 coordinates. It follows the exact
same
> structure and transformations as the original example, only with
points
> having 25 coordinates instead of 2.
>
> When creating the centroids dataset within the code as follows the
job
> iterates and executes well:
>
> Centroid25 cent1 = new
Centroid25(ThreadLocalRandom.current().nextInt(0,
> 1000),
>
>
-10.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,-0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,10.0);
> Centroid25 cent2 = new
Centroid25(ThreadLocalRandom.current().nextInt(0,
> 1000),
>
>
-1.0,-1.0,1.0,1.0,-1.0,1.0,1.0,1.0,1.0,1.0,1.0,-1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0);
> DataSet centroids = env.fromCollection(Arrays.asList(cent1,
> cent2));
>
> When reading from a csv file containing the following:
>
>
-10.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,-0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,10.0
>
>
-1.0,-1.0,1.0,1.0,-1.0,1.0,1.0,1.0,1.0,1.0,1.0,-1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0
>
> with the following code:
> DataSet> centroids = env
>
> .readCsvFile("file:///home/nltran/res3.csv")
>                                
.fieldDelimiter(",")
>                                
.includeFields("1111111111111111111111111")
>                                
.types(Double.class, Double.class,
> Double.class, Double.class,
> Double.class, Double.class,
>                                                
Double.class,
> Double.class, Double.class, Double.class, Double.class,
> Double.class,
>                                                
Double.class,
> Double.class, Double.class, Double.class, Double.class,
> Double.class,
>                                                
Double.class,
> Double.class, Double.class, Double.class, Double.class,
> Double.class,
>                                                
Double.class).map(p -> {
>                                        
return new
> Centroid25(ThreadLocalRandom.current().nextInt(0, 1000),
>
>
p.f0,p.f1,p.f2,p.f3,p.f4,p.f5,p.f6,p.f7,p.f8,p.f9,p.f10,p.f11,p.f12,p.f13,p.f14,p.f15,p.f16,p.f17,p.f18,p.f19,p.f20,p.f21,p.f22,p.f23,p.f24);
>                                
}).returns("eu.euranova.flink.Centroid25");
>
>
> I hit the following exception:
>
> 02/11/2015 14:58:27     PartialSolution (BulkIteration (Bulk
> Iteration))(1/1)
> switched to FAILED
> com.esotericsoftware.kryo.KryoException: Buffer underflow
>         at
>
>
org.apache.flink.api.java.typeutils.runtime.NoFetchingInput.require(NoFetchingInput.java:76)
>         at
com.esotericsoftware.kryo.io.Input.readVarInt(Input.java:355)
>         at
>
>
com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:109)
>         at
com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
>         at
com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
>         at
>
>
org.apache.flink.api.java.typeutils.runtime.KryoSerializer.deserialize(KryoSerializer.java:205)
>         at
>
>
org.apache.flink.api.java.typeutils.runtime.KryoSerializer.deserialize(KryoSerializer.java:210)
>         at
>
>
org.apache.flink.runtime.io.disk.InputViewIterator.next(InputViewIterator.java:43)
>         at
>
org.apache.flink.runtime.operators.NoOpDriver.run(NoOpDriver.java:91)
>         at
>
>
org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496)
>         at
>
>
org.apache.flink.runtime.iterative.task.AbstractIterativePactTask.run(AbstractIterativePactTask.java:138)
>         at
>
>
org.apache.flink.runtime.iterative.task.IterationHeadPactTask.run(IterationHeadPactTask.java:324)
>         at
>
>
org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:360)
>         at
>
>
org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:204)
>         at java.lang.Thread.run(Thread.java:745)
>
> 02/11/2015 14:58:27     Job execution switched to status
FAILING.
> 02/11/2015 14:58:27     CHAIN Map (Map at
main(DoTheKMeans.java:64)) ->
> Map (Map
> at main(DoTheKMeans.java:65))(1/1) switched to CANCELING
> 02/11/2015 14:58:27     Combine (Reduce at
main(DoTheKMeans.java:68))(1/1)
> switched to CANCELING
> 02/11/2015 14:58:27     CHAIN Reduce(Reduce at
main(DoTheKMeans.java:68))
> -> Map
> (Map at main(DoTheKMeans.java:71))(1/1) switched to CANCELING
> 02/11/2015 14:58:27     DataSink(Print to System.out)(1/1)
switched to
> CANCELED
> 02/11/2015 14:58:27     Sync(BulkIteration (Bulk
Iteration))(1/1) switched
> to
> CANCELING
> 02/11/2015 14:58:27     Sync(BulkIteration (Bulk
Iteration))(1/1) switched
> to
> CANCELED
> 02/11/2015 14:58:27     CHAIN Map (Map at
main(DoTheKMeans.java:64)) ->
> Map (Map
> at main(DoTheKMeans.java:65))(1/1) switched to CANCELED
> 02/11/2015 14:58:27     Combine (Reduce at
main(DoTheKMeans.java:68))(1/1)
> switched to CANCELED
> 02/11/2015 14:58:27     CHAIN Reduce(Reduce at
main(DoTheKMeans.java:68))
> -> Map
> (Map at main(DoTheKMeans.java:71))(1/1) switched to CANCELED
> 02/11/2015 14:58:27     Job execution switched to status FAILED.
> Exception in thread "main"
> org.apache.flink.runtime.client.JobExecutionException:
> com.esotericsoftware.kryo.KryoException: Buffer underflow
>         at
>
>
org.apache.flink.api.java.typeutils.runtime.NoFetchingInput.require(NoFetchingInput.java:76)
>         at
com.esotericsoftware.kryo.io.Input.readVarInt(Input.java:355)
>         at
>
>
com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:109)
>         at
com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
>         at
com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
>         at
>
>
org.apache.flink.api.java.typeutils.runtime.KryoSerializer.deserialize(KryoSerializer.java:205)
>         at
>
>
org.apache.flink.api.java.typeutils.runtime.KryoSerializer.deserialize(KryoSerializer.java:210)
>         at
>
>
org.apache.flink.runtime.io.disk.InputViewIterator.next(InputViewIterator.java:43)
>         at
>
org.apache.flink.runtime.operators.NoOpDriver.run(NoOpDriver.java:91)
>         at
>
>
org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496)
>         at
>
>
org.apache.flink.runtime.iterative.task.AbstractIterativePactTask.run(AbstractIterativePactTask.java:138)
>         at
>
>
org.apache.flink.runtime.iterative.task.IterationHeadPactTask.run(IterationHeadPactTask.java:324)
>         at
>
>
org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:360)
>         at
>
>
org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:204)
>         at java.lang.Thread.run(Thread.java:745)
>
>         at
>
>
org.apache.flink.runtime.client.JobClientListener$$anonfun$receiveWithLogMessages$2.applyOrElse(JobClient.scala:88)
>         at
>
>
scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
>         at
>
>
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
>         at
>
>
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
>         at
>
>
org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:37)
>         at
>
>
org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:30)
>         at
> scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
>         at
>
>
org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:30)
>         at
akka.actor.Actor$class.aroundReceive(Actor.scala:465)
>         at
>
>
org.apache.flink.runtime.client.JobClientListener.aroundReceive(JobClient.scala:74)
>         at
akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>         at akka.actor.ActorCell.invoke(ActorCell.scala:487)
>         at
akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
>         at akka.dispatch.Mailbox.run(Mailbox.scala:221)
>         at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
>         at
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>         at
>
>
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>         at
>
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>         at
>
>
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
> The centroid25 data is exactly the same in both cases. Could you
help me
> retrace what is wrong?
>
> Thanks and best regards,
>
> Tran Nam-Luc
>
>
>
> --
> View this message in context:
>
http://apache-flink-incubator-mailing-list-archive.1008284.n3.nabble.com/kryoException-Buffer-underflow-tp3760.html
> Sent from the Apache Flink (Incubator) Mailing List archive. mailing
list
> archive at Nabble.com.
>



Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message