Tuesday, May 16, 2017

Technology Report: Apache Flink

I. What is Apache Flink?:
Apache Flink® is an open-source stream processing framework for distributed, high-performing, always-available, and accurate data streaming applications.
More information can be found on Flink website. Below is some use cases from the website:

  • Optimization of e-commerce search results in real-time: Alibaba’s search infrastructure team uses Flink to update product detail and inventory information in real-time, improving relevance for users.
  • Stream processing-as-a-service for data science teams: King (the creators of Candy Crush Saga) makes real-time analytics available to its data scientists via a Flink-powered internal platform, dramatically shortening the time to insights from game data.
  • Network / sensor monitoring and error detection: Bouygues Telecom, one of the largest telecom providers in France, uses Flink to monitor its wired and wireless networks, enabling a rapid response to outages throughout the country.
  • ETL for business intelligence infrastructure: Zalando uses Flink to transform data for easier loading into its data warehouse, converting complex payloads into relatively simple ones and ensuring that analytics end users have faster access to data.

We can tease out common threads from these use cases. Based on the examples above, Flink is well-suited for:
  • A variety of (sometimes unreliable) data sources: When data is generated by millions of different users or devices, it’s safe to assume that some events will arrive out of the order they actually occurred–and in the case of more significant upstream failures, some events might come hours later than they’re supposed to. Late data needs to be handled so that results are accurate.
  • Applications with state: When applications become more complex than simple filtering or enhancing of single data records, managing state within these applications (e.g., counters, windows of past data, state machines, embedded databases) becomes hard. Flink provides tools so that state is efficient, fault tolerant, and manageable from the outside so you don’t have to build these capabilities yourself.
  • Data that is processed quickly: There is a focus in these use cases on real-time or near-real-time scenarios, where insights from data should be available at nearly the same moment that the data is generated. Flink is fully capable of meeting these latency requirements when necessary.
  • Data in large volumes: These programs would need to be distributed across many nodes (in some cases, thousands) to support the required scale. Flink can run on large clusters just as seamlessly as it runs on small ones.
II. Download and Setup:
  • Requirement: Java must be installed, and the folder for JVE must be included in %PATH% variable.
After downloading and extracting the package, go to bin folder and type start-local.bat to start Flink job manager.
After that we can connect to localhost:8081 to check if everything works.
Above is how to download and install for Window.
For Mac, we can just install homebrew and type $brew install apache-flink
For Linux: 
III. Extract, Transform and Load Data:
Before going with complex and big data like yelp dataset, I started install Flink Quick Start and tried to run it locally.
There are several ways to download quickstart.
For Mac: we can use: $ curl https://flink.apache.org/q/quickstart.sh | bash
Another way is to download Maven and set Maven archetype as follow
      $ mvn archetype:generate \ 
       -DarchetypeGroupId=org.apache.flink \ 
       -DarchetypeArtifactId=flink-quickstart-java \ 
       -DarchetypeVersion=1.2.0
Or we can download IDE for Flink, I prefer IntelliJ IDEA and it allows you to evaluate free for 1 month.
Finished Job for QuickStart:

As Quick Start comes with a project skeleton, we can easily import Flink libraries and work with BatchJob or StreamJob files in the main folder.
I use csvkit to transform and extract valuable data that is needed for this test, for Yelp Business file, I only get city, review_count and name of business into file called business.csv

After that, we can load the data with IDE, I immediately output the data to csv files using Flink Data Sink to see if we load the file correctly.
The program ran successfully, output a folder with 8 csv files, each file was about 550KB.
Now, we can delete all the files, then test some transformation with Flink using filter(), map() and reduce().
First of all, we need to check if we loaded the data correctly, by counting the number of lines for the file. 



The program printed out 144072 lines, without the first line for schema so we got the correct number.

First query I tried is to find top 5 business in each city that has most review_count.

The results folder had 8 files with similar size again, 55 kb each, so I think this is how IntelliJ IDEA setup to output files.

Next, I added another transformation: filter, to get all top review from Las Vegas business.
I tried to do it all at once, but got error that the IDE does not recognize my static function. So I commented it out and filter before group and sort.
Above is the result for such query, all is stored in file 2 instead of being divided equally to 8 files as before.
Next, I tried to extract and transform data from the whole business file, without using csvkit.
The important line here is .includeFields with an array of 1 and 0. There are 16 bits for this parameter, 1 is included, 0 is not included. So I put 1 in the 8th, 9th and 10th bits to include city, review_count and name.
Another problem I have encountered while loading data was with the review file. First, I tried to load the same way as business, then got error: "Row too short". So there are some rows in the files that do not qualify the schema. I added ".ignoreInvalidLines()" then check number of loaded lines.
There were only 2353660 lines loaded (more than 50%). So I tried to use csvgrep to get business_id and stars, then load again.

4153150 lines were loaded now, so we successfully extracted and loaded important data that we need.
Checkin, User and Tip files were loaded as same as Business without any problems.
Last query that I worked on this DataSet API of Flink is to output top 100 businesses that have the most stars.
To do that, first I need to add all the stars in review dataset with the same business_id.

To add all stars, we need to use reduce() transformation and implement StarsCounter() based on that.
Then join two dataset by business_id (field 0 for both) and output business.name(field 3 for business) and total stars(field 1 for review_sum) using my own JoinBusinessReview class that implements JoinFunction.

We can see the following output with field delimiter set to "\t":
IV. Table API and DataStream API:
Flink also has Table API which is a high level language, same as SQL.
Flink's main strength is DataStream API. Below is the sample program from Flink that takes input stream at port 9999 and feed it to output every 5 seconds.
V. Optimization:
Quality: we can use GroupReduce and Set in Java to add data into Set so that Set can eliminate duplicate data for us.
Using Partition in Flink to optimize performance with parallel programming, we can also use Rebalance function to balance all parallel partitions.
VI. Conclusion:
Flink has both DataSet API and DataStream API and excels at Streaming Data. We can use Flink with the following structure:
Flink takes data from input Stream, processes it with real time query and output to live data view, then transfer those data into data warehouse Batch Layer where we can do complicated queries with MapReduce, transformation. Both layers output data as csv file which can be stored and analyzed later within the Batch Layer.
Take another closer look with Flink use cases in big corporation.
Most of them use Flink to process and analyze real-time data. Below is that last picture for Flink's advantages and why we should use Flink.