Aws streaming data using Scala Spark

Aws streaming data is a kind of stream of data getting in a batch-wise or bulk of data from different producers or devices.

In Aws streaming, data can be achieved by AWS Kinesis which gives live streaming data or 24hr streaming data depending on the retention period which you opt on.

We will see a real example of Aws streaming data by creating an AWS Kinesis account and sending data to it by using Scala Spark.

First, create an account in AWS Kinesis. To do so we need an AWS account make sure you log in to AWS or create a new account then follow the below steps.

Shares


Steps for Creating AWS Kinesis account for Aws streaming data using Scala Spark

Create AWS Account

After creating an AWS account search for kinesis services as shown below, and the open first option I,e Kinesis

Aws streaming data using Scala Spark
Source

Navigate to the Kinesis service

After you get into Kinesis services, as per this demonstration for Aws streaming data use the Kinesis Data Streams service we will discuss other related services in other articles.

Select the Kinesis Data Streams option and click create data stream button

Aws streaming data using Scala Spark creating kinesis stream

Create data stream

Next, give a specific name of your stream which you can call on, and keep all options as default like Datastream capacity keep it as on-demand

AWS Kinesis account for Aws streaming data using Scala Spark

Scroll down and press Create data stream button, you will get like as shown below,

creating a Kinesis stream successful

Creating User for under IAM role

User is created to provide them access to AWS services through the code, so we need AWS_ACCESS_KEY and AWS_SECRET_KEY. Go to the search bar on the AWS home page as shown below. Click on IAM services.

creating a Kinesis stream and adding IAM roles

Click on IAM manage access to AWS resources, and create a user by navigating to the Users section as shown below.

image 34 Aws streaming data,Aws streaming data using Scala Spark,AWS Kinesis,AWS Kinesis using scala spark

After you create a User you will get AWS ACCESS KEY and AWS SECRET KEY, save these two in your system safe.

Setting up the configs for AWS Credentials in your system environment.

There are three ways to do this

  1. One is by using the .aws folder In your system C files
  2. The second way is setting it in the system environment variable
  3. The third one is using AWS standard() method specifying credentials directly in your code either you use variables to specify or you will keep the config file and call them into your code.

We will see examples in this article and also in upcoming articles

Aws streaming data using Kinesis created successfully

Done creating Aws streaming data source now we should create Producer and Consumer for Kinesis stream using Scala Spark.

We can make Producer and Consumer using python code as well, or Pyspark but, this stream will have more than a billion or trillion of data following in one go if you have devices or handling any organization like Netflix, Comcast, or different source data, to control this adequately or handling the termination exception we prefer to use spark jobs which can handle these much of data.

We will see an example of Scala Spark and how we can create a Producer and Consumer.

Aws streaming data using Scala Spark In IntelliJ

Producer for Kinesis Stream using Scala Spark in IntelliJ

Step 1: Setup all required applications and libraries, to do that, please see this article Spark setup in windows using IntelliJ idea

Step 2: After Step 1 is done, now open IntelliJ Idea and open a new project as Maven

Step 3: Inside the project rename the Java folder to Scala as shown below.

image 35 Aws streaming data,Aws streaming data using Scala Spark,AWS Kinesis,AWS Kinesis using scala spark

Step 4: Right-click on scala and create an object file with some specific name, in my case I have named as kinesisWriter.scala

Step 5: Now continue the code as shown below.

AWS Kinesis Producer Code for Aws streaming data using Scala Spark

pom.xml

 <dependencies>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>2.4.8</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>2.4.8</version>
        </dependency>

        <!--   Kinesis streaming artifacts     -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kinesis-asl_2.12</artifactId>
            <version>3.2.1</version>
        </dependency>

        <dependency>
            <groupId>com.amazonaws</groupId>
            <artifactId>amazon-kinesis-client</artifactId>
            <version>1.14.8</version>
        </dependency>

        <dependency>
            <groupId>com.amazonaws</groupId>
            <artifactId>amazon-kinesis-producer</artifactId>
            <version>0.14.12</version>
        </dependency>
        <dependency>
            <groupId>software.amazon.kinesis</groupId>
            <artifactId>amazon-kinesis-client</artifactId>
            <version>2.4.1</version>
        </dependency>
        <!--        -->
        <dependency>
            <groupId>software.amazon.awssdk</groupId>
            <artifactId>sdk-core</artifactId>
            <version>2.17.191</version>
        </dependency>
    </dependencies>

AWS Kinesis Producer Code



import com.amazonaws.auth.{AWSStaticCredentialsProvider, BasicAWSCredentials}
import com.amazonaws.services.kinesis.AmazonKinesisClientBuilder
import com.amazonaws.services.kinesis.model.PutRecordRequest
import org.apache.spark.sql.types.{DataTypes, StructType}
import org.apache.spark.sql.{Row, SparkSession}
import java.nio.ByteBuffer

object KinesisStream {
 def main(args: Array[String]): Unit = {

  val spark = SparkSession
    .builder
    .appName("Kinesis Writer")
    .master("local[*]")
    .getOrCreate()

  val myStructureData = Seq {
    Row("Solutiongigs", "Example code")
  }

  val myStructureSchema = new StructType()
    .add("Name", DataTypes.StringType)
    .add("Description", DataTypes.StringType)

  val df  = spark.createDataFrame(
    spark.sparkContext.parallelize(myStructureData),myStructureSchema)

  //    we have created a dataframe df now we will convert it to Json object and send it to kinesis
  //    please **** Note: if you have data already in the form of json no need of converting it you can directly send it to Kinesis 


  val to_json = df.toJSON
  to_json.foreach(x => println("The data is " + x))
   
     // Please add your stream name here 
  val streamName: String = "Your_Stream_name_here"

  val kinesisClient = AmazonKinesisClientBuilder.standard()
    .withRegion("region_name")
    .withCredentials(new AWSStaticCredentialsProvider(new BasicAWSCredentials("your_aws_secret_Key", "your_aws_access_key_id"))).build()

  println(s"Putting results onto stream" + streamName)
  var lastSequenceNumber: String = null
  val time = System.currentTimeMillis


  to_json.foreach { x =>
   val data = s"$x"
   val request = new PutRecordRequest()
     .withStreamName(streamName)
     .withData(ByteBuffer.wrap(data.getBytes()))
     .withPartitionKey("integrationKey")
   val result = kinesisClient.putRecord(request)
   println(result)
  }
 }
}

Conclusion

In this was an article we discussedĀ Aws streaming data using Scala SparkĀ and how to set up the spark in IntelliJ with a sample spark session example be tuned for more articles like this, any queries please comment down below, thank you, and Happy learning!!.

Leave a Comment

Your email address will not be published. Required fields are marked *