Aws streaming data is a kind of stream of data getting in a batch-wise or bulk of data from different producers or devices.
In Aws streaming, data can be achieved by AWS Kinesis which gives live streaming data or 24hr streaming data depending on the retention period which you opt on.
We will see a real example of Aws streaming data by creating an AWS Kinesis account and sending data to it by using Scala Spark.
First, create an account in AWS Kinesis. To do so we need an AWS account make sure you log in to AWS or create a new account then follow the below steps.
Steps for Creating AWS Kinesis account for Aws streaming data using Scala Spark
Create AWS Account
After creating an AWS account search for kinesis services as shown below, and the open first option I,e Kinesis
Navigate to the Kinesis service
After you get into Kinesis services, as per this demonstration for Aws streaming data use the Kinesis Data Streams service we will discuss other related services in other articles.
Select the Kinesis Data Streams option and click create data stream button
Create data stream
Next, give a specific name of your stream which you can call on, and keep all options as default like Datastream capacity keep it as on-demand
Scroll down and press Create data stream button, you will get like as shown below,
Creating User for under IAM role
User is created to provide them access to AWS services through the code, so we need AWS_ACCESS_KEY and AWS_SECRET_KEY. Go to the search bar on the AWS home page as shown below. Click on IAM services.
Click on IAM manage access to AWS resources, and create a user by navigating to the Users section as shown below.
After you create a User you will get
and
, save these two in your system safe.
Setting up the configs for AWS Credentials in your system environment.
There are three ways to do this
- One is by using the
folder In your system.aws
filesC - The second way is setting it in the system environment variable
- The third one is using
method specifying credentials directly in your code either you use variables to specify or you will keep the config file and call them into your code.AWS standard()
We will see examples in this article and also in upcoming articles
Aws streaming data using Kinesis created successfully
Done creating Aws streaming data source now we should create Producer and Consumer for Kinesis stream using Scala Spark.
We can make Producer and Consumer using python code as well, or Pyspark but, this stream will have more than a billion or trillion of data following in one go if you have devices or handling any organization like Netflix, Comcast, or different source data, to control this adequately or handling the termination exception we prefer to use spark jobs which can handle these much of data.
We will see an example of Scala Spark and how we can create a Producer and Consumer.
Aws streaming data using Scala Spark In IntelliJ
Producer for Kinesis Stream using Scala Spark in IntelliJ
Step 1: Setup all required applications and libraries, to do that, please see this article Spark setup in windows using IntelliJ idea
Step 2: After Step 1 is done, now open IntelliJ Idea and open a new project as Maven
Step 3: Inside the project rename the Java folder to Scala as shown below.
Step 4: Right-click on scala and create an object file with some specific name, in my case I have named as kinesisWriter.scala
Step 5: Now continue the code as shown below.
AWS Kinesis Producer Code for Aws streaming data using Scala Spark
<dependencies> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> <version>2.4.8</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>2.4.8</version> </dependency> <!-- Kinesis streaming artifacts --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kinesis-asl_2.12</artifactId> <version>3.2.1</version> </dependency> <dependency> <groupId>com.amazonaws</groupId> <artifactId>amazon-kinesis-client</artifactId> <version>1.14.8</version> </dependency> <dependency> <groupId>com.amazonaws</groupId> <artifactId>amazon-kinesis-producer</artifactId> <version>0.14.12</version> </dependency> <dependency> <groupId>software.amazon.kinesis</groupId> <artifactId>amazon-kinesis-client</artifactId> <version>2.4.1</version> </dependency> <!-- --> <dependency> <groupId>software.amazon.awssdk</groupId> <artifactId>sdk-core</artifactId> <version>2.17.191</version> </dependency> </dependencies>
AWS Kinesis Producer Code
import com.amazonaws.auth.{AWSStaticCredentialsProvider, BasicAWSCredentials}
import com.amazonaws.services.kinesis.AmazonKinesisClientBuilder
import com.amazonaws.services.kinesis.model.PutRecordRequest
import org.apache.spark.sql.types.{DataTypes, StructType}
import org.apache.spark.sql.{Row, SparkSession}
import java.nio.ByteBuffer
object KinesisStream {
def main (args: Array[String]): Unit = {
val spark = SparkSession
.builder
.appName("Kinesis Writer" )
.master("local[*]" )
.getOrCreate()
val myStructureData = Seq {
Row("Solutiongigs" , "Example code" )
}
val myStructureSchema = new StructType()
.add("Name" , DataTypes.StringType )
.add("Description" , DataTypes.StringType )
val df = spark.createDataFrame(
spark.sparkContext.parallelize(myStructureData),myStructureSchema)
// we have created a dataframe df now we will convert it to Json object and send it to kinesis
// please **** Note: if you have data already in the form of json no need of converting it you can directly send it to Kinesis
val to_json = df.toJSON
to_json.foreach(x => println("The data is " + x))
// Please add your stream name here
val streamName: String = "Your_Stream_name_here"
val kinesisClient = AmazonKinesisClientBuilder.standard()
.withRegion("region_name" )
.withCredentials(new AWSStaticCredentialsProvider(new BasicAWSCredentials("your_aws_secret_Key" , "your_aws_access_key_id" ))).build()
println(s"Putting results onto stream" + streamName)
var lastSequenceNumber: String = null
val time = System.currentTimeMillis
to_json.foreach { x =>
val data = s"$x"
val request = new PutRecordRequest()
.withStreamName(streamName)
.withData(ByteBuffer.wrap(data.getBytes()))
.withPartitionKey("integrationKey" )
val result = kinesisClient.putRecord(request)
println(result)
}
}
}
Conclusion
In this was an article we discussedĀ Aws streaming data using Scala SparkĀ and how to set up the spark in IntelliJ with a sample spark session example be tuned for more articles like this, any queries please comment down below, thank you, and Happy learning!!.