flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From subash basnet <yasub...@gmail.com>
Subject Re: How to convert List to flink DataSet
Date Fri, 12 Feb 2016 16:42:47 GMT
Hello Fabian,

Thank you for the response, but I have been stuck on how to iterate over
the DataSet, perform operations and return a new modified DataSet similar
to that of list operation as shown below.
Eg: Currently I am doing the following:
for (Centroid centroid : centroids.collect()) {
    for (Tuple2<Integer, Point> element : clusteredPoints.collect()) {
       //perform necessary operations
     }
//add elements
}
//return elements list

It would be really nice if I could just get started.

I have been trying to add element to DataSet using *join*, but when I print
the DataSet it contains only one initial element, it prints the same value
as initial set value.
for(....){
newElement = new Tuple3<Integer, Point, Boolean>();
dataSetElement.join(env.fromElements(newElement));
dataSetElement.print();
}

Unsure if I am using right function or using join in a wrong manner.

Best Regards,
Subash Basnet

On Wed, Feb 10, 2016 at 6:33 PM, Fabian Hueske <fhueske@gmail.com> wrote:

> I would try to do the outlier compuation with the DataSet API instead of
> fetching the results to the client with collect().
> If you do that, you can directly use writeAsCsv because the result is
> still a DataSet.
>
> What you have to do, is to translate your findOutliers method into DataSet
> API code.
>
> Best, Fabian
>
> 2016-02-10 18:29 GMT+01:00 subash basnet <yasubash@gmail.com>:
>
>> Hello Fabian,
>>
>> As written before code:
>>
>>
>>
>> *DataSet<Tuple3> fElements =
>> env.fromCollection(findOutliers(clusteredPoints,
>> finalCentroids));fElements.writeAsCsv(outputPath, "\n", "
>> ");env.execute("KMeans Example");*
>> I am very new to flink so not so clear about what you suggested, by
>> option(1) you meant that I write my own FileWriter here rather than using
>> *writeAsCsv()* method. And option(2) I couldn't understand where to
>> compute the outlier. I would want to use the *writeAsCsv() *method but
>> currently it doesn't perform the write operation and unable to understand
>> why.
>>
>> An interesting thing I found is, when I run the *outlierDetection* class
>> from eclipse a single file *result* gets written within the kmeans
>> folder, whereas in case of default *KMeans* class it writes a result
>> folder within the kmeans folder and the files with points are written
>> inside the result folder.
>> I give the necessary path in the arguments while running.
>> Eg: file:///home/softwares/flink-0.10.0/kmeans/points
>> file:///home/softwares/flink-0.10.0/kmeans/centers
>> file:///home/softwares/flink-0.10.0/kmeans/result 10
>>
>> Now, after I create the runnable jar file for KMeans and outlierDetection
>> class,  when I upload it to *flink web submission client *it works fine
>> for *KMeans.jar*, the folder and files get created. But incase of
>> *outlierDetection.jar* no file or folder get's written inside kmeans.
>>
>> How is it that outlier class is able to write file via eclipse but
>> outlier jar not able to write via flink web submission client.
>>
>>
>> Best Regards,
>> Subash Basnet
>>
>> On Wed, Feb 10, 2016 at 1:58 PM, Fabian Hueske <fhueske@gmail.com> wrote:
>>
>>> Hi Subash,
>>>
>>> I would not fetch the data to the client, do the computation there, and
>>> send it back, just for the purpose of writing it to a file.
>>>
>>> Either 1) pull the results to the client and write the file from there
>>> or 2) compute the outliers in the cluster.
>>> I did not study your code completely, but the two nested loops and the
>>> condition are a join for example.
>>>
>>> I would go for option 2, if possible.
>>>
>>> Best, Fabian
>>>
>>>
>>> 2016-02-10 13:07 GMT+01:00 subash basnet <yasubash@gmail.com>:
>>>
>>>> Hello Fabian,
>>>>
>>>> I use the collect() method to get the elements locally and perform
>>>> operations on that and return the result as a collection. The collection
>>>> result is converted to the DataSet in the calling method.
>>>> Below is the code of *findOutliers *method:
>>>>
>>>> public static List<Tuple3> findOutliers(DataSet<Tuple2<Integer,
Point>>
>>>> clusteredPoints,
>>>> DataSet<Centroid> centroids) throws Exception {
>>>> List<Tuple3> finalElements = new ArrayList<Tuple3>();
>>>> *List<Tuple2<Integer, Point>> elements = clusteredPoints.collect();*
>>>> * List<Centroid> centroidList = centroids.collect();*
>>>> List<Tuple3<Centroid, Tuple2<Integer, Point>, Double>>
>>>> elementsWithDistance = new ArrayList<Tuple3<Centroid,
>>>> Tuple2<Integer, Point>, Double>>();
>>>> for (Centroid centroid : centroidList) {
>>>> elementsWithDistance = new ArrayList<Tuple3<Centroid, Tuple2<Integer,
>>>> Point>, Double>>();
>>>> double totalDistance = 0;
>>>> int elementsCount = 0;
>>>> for (Tuple2<Integer, Point> e : elements) {
>>>> // compute distance
>>>> if (e.f0 == centroid.id) {
>>>> Tuple3<Centroid, Tuple2<Integer, Point>, Double> newElement =
new
>>>> Tuple3<Centroid,
>>>>     Tuple2<Integer, Point>, Double>();
>>>> double distance = e.f1.euclideanDistance(centroid);
>>>> totalDistance += distance;
>>>> newElement.setFields(centroid, e, distance);
>>>> elementsWithDistance.add(newElement);
>>>> elementsCount++;
>>>> }
>>>> }
>>>> // finding mean
>>>> double mean = totalDistance / elementsCount;
>>>> double sdTotalDistanceSquare = 0;
>>>> for (Tuple3<Centroid, Tuple2<Integer, Point>, Double>
>>>> elementWithDistance : elementsWithDistance) {
>>>> double distanceSquare = Math.pow(mean - elementWithDistance.f2, 2);
>>>> sdTotalDistanceSquare += distanceSquare;
>>>> }
>>>> double sd = Math.sqrt(sdTotalDistanceSquare / elementsCount);
>>>> double upperlimit = mean + 2 * sd;
>>>> double lowerlimit = mean - 2 * sd;
>>>> Tuple3<Integer, Point, Boolean> newElement = new Tuple3<Integer,
Point,
>>>> Boolean>();// true
>>>> // =
>>>> // outlier
>>>> for (Tuple3<Centroid, Tuple2<Integer, Point>, Double>
>>>> elementWithDistance : elementsWithDistance) {
>>>> newElement = new Tuple3<Integer, Point, Boolean>();
>>>> if (elementWithDistance.f2 < lowerlimit || elementWithDistance.f2 >
>>>> upperlimit) {
>>>> // set as outlier
>>>> newElement.setFields(elementWithDistance.f1.f0,
>>>> elementWithDistance.f1.f1, true);
>>>> } else {
>>>> newElement.setFields(elementWithDistance.f1.f0,
>>>> elementWithDistance.f1.f1, false);
>>>> }
>>>> finalElements.add(newElement);
>>>> }
>>>> }
>>>> return finalElements;
>>>> }
>>>>
>>>> I have attached herewith the screenshot of my project structure and
>>>> KMeansOutlierDetection.java file for more clarity.
>>>>
>>>>
>>>> Best Regards,
>>>> Subash Basnet
>>>>
>>>> On Wed, Feb 10, 2016 at 12:26 PM, Fabian Hueske <fhueske@gmail.com>
>>>> wrote:
>>>>
>>>>> [image: Boxbe] <https://www.boxbe.com/overview> This message is
>>>>> eligible for Automatic Cleanup! (fhueske@gmail.com) Add cleanup rule
>>>>> <https://www.boxbe.com/popup?url=https%3A%2F%2Fwww.boxbe.com%2Fcleanup%3Ftoken%3D8hdIOJf0i4083WeIB%252BQUXfS8djluXs0JekXPLuRpitIZdx1%252FAH%252BEFK2XXNXEBi6cnglpq9HBimim9%252FKCQ7UDLHnqGh6CGYAGY7zzpc82QTIAjEQM22%252FkBmdko8aAcxcD2P3ax587Jik%253D%26key%3DNO%252B%252BpxOTI6yOrzMtJK8863zNLUnk0hGhdxHIyLoWxck%253D&tc_serial=24326433442&tc_rand=1135108797&utm_source=stf&utm_medium=email&utm_campaign=ANNO_CLEANUP_ADD&utm_content=001>
>>>>> | More info
>>>>> <http://blog.boxbe.com/general/boxbe-automatic-cleanup?tc_serial=24326433442&tc_rand=1135108797&utm_source=stf&utm_medium=email&utm_campaign=ANNO_CLEANUP_ADD&utm_content=001>
>>>>>
>>>>> Hi Subash,
>>>>>
>>>>> how is findOutliers implemented?
>>>>>
>>>>> It might be that you mix-up local and cluster computation. All
>>>>> DataSets are processed in the cluster. Please note the following:
>>>>> - ExecutionEnvironment.fromCollection() transforms a client local
>>>>> connection into a DataSet by serializing it and sending it to the cluster.
>>>>> - DataSet.collect() transforms a DataSet into a collection and ships
>>>>> it back to the client.
>>>>>
>>>>> So, does findOutliers operate on the cluster or on the local client,
>>>>> i.e., does it work with DataSet and send the result back as a collection
or
>>>>> does it first collect the results as collection and operate on these?
>>>>>
>>>>> Best, Fabian
>>>>>
>>>>> 2016-02-10 12:13 GMT+01:00 subash basnet <yasubash@gmail.com>:
>>>>>
>>>>>> Hello Stefano,
>>>>>>
>>>>>> Yeah the type casting worked, thank you. But not able to print the
>>>>>> Dataset to the file.
>>>>>>
>>>>>> The default below code which writes the KMeans points along with
>>>>>> their centroid numbers to the file works fine:
>>>>>>                 // feed new centroids back into next iteration
>>>>>> DataSet<Centroid> finalCentroids = loop.closeWith(newCentroids);
>>>>>> DataSet<Tuple2<Integer, Point>> clusteredPoints = points
>>>>>> // assign points to final clusters
>>>>>> .map(new SelectNearestCenter()).withBroadcastSet(finalCentroids,
>>>>>> "centroids");
>>>>>>               if (fileOutput) {
>>>>>> clusteredPoints.writeAsCsv(outputPath, "\n", " ");
>>>>>> // since file sinks are lazy, we trigger the execution explicitly
>>>>>> env.execute("KMeans Example");
>>>>>> }
>>>>>>
>>>>>> But my modified code below to find outlier:
>>>>>> // feed new centroids back into next iteration
>>>>>> DataSet<Centroid> finalCentroids = loop.closeWith(newCentroids);
>>>>>> DataSet<Tuple2<Integer, Point>> clusteredPoints = points
>>>>>> // assign points to final clusters
>>>>>> .map(new SelectNearestCenter()).withBroadcastSet(finalCentroids,
>>>>>> "centroids");
>>>>>>                *DataSet<Tuple3> fElements =
>>>>>> env.fromCollection(findOutliers(clusteredPoints, finalCentroids));*
>>>>>>                if (fileOutput) {
>>>>>> *fElements.writeAsCsv(outputPath, "\n", " ");*
>>>>>> // since file sinks are lazy, we trigger the execution explicitly
>>>>>> env.execute("KMeans Example");
>>>>>> }
>>>>>>
>>>>>> It's not writing to the file, the *result *folder does not get
>>>>>> created inside kmeans folder where my centers, points file are located.
I
>>>>>> am only able to print it to the console via *fElements.print();*
>>>>>>
>>>>>> Does it have something to do with *env.exectue("")*, which must be
>>>>>> set somewhere in the previous case but not in my case.
>>>>>>
>>>>>>
>>>>>>
>>>>>> Best Regards,
>>>>>> Subash Basnet
>>>>>>
>>>>>>
>>>>>> On Tue, Feb 9, 2016 at 6:29 PM, Stefano Baghino <
>>>>>> stefano.baghino@radicalbit.io> wrote:
>>>>>>
>>>>>>> [image: Boxbe] <https://www.boxbe.com/overview> This message
is
>>>>>>> eligible for Automatic Cleanup! (stefano.baghino@radicalbit.io)
Add
>>>>>>> cleanup rule
>>>>>>> <https://www.boxbe.com/popup?url=https%3A%2F%2Fwww.boxbe.com%2Fcleanup%3Ftoken%3D1lghJuQA8DeL%252BeQeu%252BXtjFS3Ln6XzfMngdzEeoXhxNL9D%252Fev2KZxlYVTG7zXzAOKqyTfuHHhyjFCeIEJhsuw2xSonJ%252Fz9ELZJGQHf2k5wgw88cHdkws1iTkY3LXpay0T6G30GCRRcKpUcUeyr6wyDBlPBPj1idLV%26key%3DzNLvLvtrNviObgkHecr87NQBUPN5j9wZMWIyBsSzzNM%253D&tc_serial=24317106750&tc_rand=543304732&utm_source=stf&utm_medium=email&utm_campaign=ANNO_CLEANUP_ADD&utm_content=001>
>>>>>>> | More info
>>>>>>> <http://blog.boxbe.com/general/boxbe-automatic-cleanup?tc_serial=24317106750&tc_rand=543304732&utm_source=stf&utm_medium=email&utm_campaign=ANNO_CLEANUP_ADD&utm_content=001>
>>>>>>>
>>>>>>> Assuming your EnvironmentContext is named `env` Simply call:
>>>>>>>
>>>>>>> DataSet<Tuple3<Integer, Point, Boolean>> fElements
= env.
>>>>>>> *fromCollection*(finalElements);
>>>>>>>
>>>>>>> Does this help?
>>>>>>>
>>>>>>> On Tue, Feb 9, 2016 at 6:06 PM, subash basnet <yasubash@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hello all,
>>>>>>>>
>>>>>>>> I have performed a modification in KMeans code to detect
outliers.
>>>>>>>> I have printed the output in the console but I am not able
to write it to
>>>>>>>> the file using the given 'writeAsCsv' method.
>>>>>>>> The problem is I generate a list of tuples.
>>>>>>>> My List is:
>>>>>>>> List<Tuple3> finalElements = new ArrayList<Tuple3>();
>>>>>>>> Following is the datatype of the elements added to the list:
>>>>>>>> Tuple3<Integer, Point, Boolean> newElement = new Tuple3<Integer,
>>>>>>>> Point, Boolean>();
>>>>>>>> finalElements.add(newElement);
>>>>>>>> Now I am stuck on how to convert this 'finalElements' to
>>>>>>>> DataSet<Tuple3<Integer, Point, Boolean>> fElements,
>>>>>>>> so that I could use
>>>>>>>> fElements.writeAsCsv(outputPath, "\n"," ");
>>>>>>>>
>>>>>>>> Best Regards,
>>>>>>>> Subash Basnet
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> BR,
>>>>>>> Stefano Baghino
>>>>>>>
>>>>>>> Software Engineer @ Radicalbit
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>

Mime
View raw message