Sat 23 February 2019
By Haven
In Technology .
tags: Kafka Avro Spark Hive
This article combines high level architectural design and hands-on scripting experience from Kafka POC project.
Driver
The first step is to create Driver. In the driver we use SparkConf to set parameter, then Spark cluster can allocate resources on cluster. We also need SparkContext, when the parameter is passed to SparkContext, it will ask Spark cluster manager to allocate resource for executers. Resource manager set master node and worker nodes (like Master-Slave model) to allocate executors, for example, when we use spark streaming to publish or consume data, RDDs will be sent to different worker nodes.
val conf = new SparkConf () . setAppName ( "SparkDriverName" )
val sc = new SparkContext ( conf )
val sqlc = new HiveContext ( sc )
DAO
The second step is to create DAO. In Sparks model we use HiveContext to implement SQL query, if the data comes from local, we read from local path and use HiveContext to register temp table with DBName.TableName, or we connect to hive table.
Local csv : sqlc . read . format ( ” com . databricks . spark . csv ” ) . option ( “ header ” , ” true ” ) . load ( LocalCSVPath ) . registerTempTable ( tableName )
or
Hive table : Sqlc . table ( DBName + ” . ” + TableName ) . registerTempTable ( tableName )
Then ,
sqlc . table ( tableName )
Hive has metastore service which stores metadata including relationship and partition for relational database in master node, and the data is stored in HDFS server. Mostly we create internal table, security is controlled by Hive since delete the table will cause both metadata and data lost. Compared with internal table, delete external table only causes metadata deleted.
Dataframe
With Data access object with can use Spark SQL to create dataframe, the business logic will be implemented in this step. We use expr to input SQL query as a string variable and transform input dataframe to output dataframe. For this part research and test is necessary, since Spark SQL is not exactly the same as Oracle SQL.
val SQLExpression = ” SQLQuery ”
InputDataframe . as ( “ DfName ” ) . join ( InputDataframe2 , Seq ( “ ColumnName ” ), “ left_outer ” ) . select ( col ( “ ColumnName ” , SQLExpression ))
Java Object
Avro has Json like data model and it can present complex data structure with business logic. With advantages as direct mapping to JSON and great binding for wide variety of programming languages, we can download an open source avro tools to generate Java object automatically from Avro schema. Then we use Jackson mapper to convert generic class to this format. With Java class, Jackson mapper can be used to parse JSON (dataframe) content into Java object.
java - jar avro - tools - 1.8 . x - cdhxxx . jar compile schema AvroSchemaName . avsc FolderName
val JavaObj = mapper . readValue ( OutputDataframe , classof [ JavaClassName ])
Serialize
After we register Avro schema on Confluent platform, we use data serialization system of Avro to convert Java object to a compact binary format which is faster and easier to publish. Please review the next step. The kafka serializer will be passed as property parameter to publisher.
Publish
The last step is publish. Spark streaming can accumulate the messages for a short period of time as RDStream rather than process the messages one by one, and then publish. Inside each DStream there are RDDs, and each RDD is composed with partitions. Messages is are divided into partitions, offset is assigned to partitions and it will be used to maintain the order and tell the publisher where to start.
Since RDDs are scaled to multiple worker nodes, only if we use collect can do count/first/top or other action functions, otherwise we can only do transformation functions like filter/groupby.
Dataframe . foreachPartition ( records => {
records . foreach ( record => {
Val SerializedData = new ProducerRecord [ String , JavaClassName ]( TopicName , JavaObj )
Producer . send ( SerializedData ) . get
}
}
Reference