Apache Kafka với Scala

Apache Kafka là nền tảng streaming phân tán mã nguồn mở được sử dụng để xây dựng các pipeline dữ liệu thời gian thực và các ứng dụng streaming. Nó có khả năng mở rộng theo chiều ngang, chịu lỗi, hoạt động nhanh chóng và chạy trên production của hàng nghìn công ty.

Trước khi giới thiệu về Apache Kafka, các pipeline dữ liệu được sử dụng rất phức tạp và mất thời gian. Việc chia tách các pipeline streaming là rất cần thiết cho tất cả phía tiêu thụ. Bạn có thể hình dung được sự phức tạp như hình bên dưới đây

Apache Kafka giải quyết được vấn đề này và cung cấp pipeline tổng quát có khả năng chịu lỗi, có thể mở rộng và sử dụng đơn giản. Hiên tại có một pipeline duy nhất cần thiết để phục vụ nhiều phía tiêu dùng, giống như hình bên dưới đây

Trong bài viết này chúng ta sẽ cùng nhau bắt đầu với Apache Kafka, hiểu thuật ngữ cơ bản và làm thế nào để tạo các producer và consumer của Kafka sử dụng api của scala.

Apache Kafka là dự án mã nguồn mở, ban đầu được tạo bởi Linkedin, được thiết kế là một service log đảm bảo được khả năng phân tán, phân vùng và nhân rộng. Nó cung cấp các chức năng của một hệ thống truyền tin.

Topic và các bản tin.

Topic trong Kafka là nơi lưu trữ tất cả các bản tin được tạo ra. Các bản tin là đơn vị của dữ liệu, có thể là các mảng byte và các object bất kỳ có thể được lưu trữ theo định dạng bất kỳ.

Bản tin có 2 thành phần: key và value. Key tượng trưng dữ liệu về bản tin, còn value tượng trưng cho nội dung của bản tin.

Apache Kafka nguồn cung cấp các bản tin được sắp xếp thành một topic. Hãy coi nó như một loại bản tin. Topic có thể được chia thành một số phân vùng như hình bên dưới đây

Apache Kafka sử dụng phân vùng để mở rộng topic trên nhiều server cho producer viết. Một cluster Kafka bao gồm một hoặc nhiều server được gọi là brokers. Mỗi broker này lưu trữ một hay nhiều phân vùng trên đó.

Producer và Consumer

Kafka cung cấp Producer API và Consumer API.  Producer được sử dụng để đưa ra các bản tin cho các topic được lưu trữ trong các phân vùng topic khác nhau. Mỗi phân vùng topic này được sắp xếp, một chuỗi các bản tin kiểu immutable liên tục được nối vào.

Producer ánh xạ từng bản tin mà nó muốn tạo ra cho topic. Producer là client tạo ra các bản ghi lên cluster của Kafka và nên nhớ rằng nó là một thread-safe. Producer client kiểm soát các phân vùng mà nó tạo ra các bản tin cho.

Consumer có thể đăng ký với các topic và xử lý nguồn cấp dữ liệu của các bản tin được phát hành trong thời gian thực. Kafka giữ lại tất cả các bản tin được tạo ra bất kể chúng được tiêu thụ hay chưa trong khoảng thời gian được cấu hình. Hãy xem hình bên dưới đây để hiểu rõ hơn về điều này.

Ở đây chúng ta có nhiều Producer, chúng tạo ra bản tin cho topic trên broker khác nhau và từ đó các Consumer đọc từ bất kỳ topic mà chúng đã đăng ký.

Apache Kafka có thể trải rộng một phân vùng topic đơn trên nhiều broker, cho phép mở rộng theo chiều ngang. Bằng cách trải rộng các phân vùng của topic trên nhiều broker, các consumer có thể đọc từ topic đơn song song.

Đây là những giới thiệu cơ bản các thuật ngữ phổ biến được sử dụng khi làm việc với Kafka. Tiếp theo, chúng ta sẽ tiếp tục và thử tìm hiểu cách tạo producer và consumer trong Kafka.

Như điều kiện cần, chúng ta nên có zookeeper và Kafka server đang chạy. Bạn có thể tham khảo quickstart này để cài đặt một node đơn Kafka đơn trên máy local.

Giả sử rằng bạn đã có một server được khởi chạy, chúng ta sẽ bắt đầu xây dựng một ứng dụng producer-consumer đơn giản, producer sẽ tạo ra các bản tin trong một Kafka topic và consumer có thể đăng ký topic và nhận các bản tin trong thời gian thực.

Thêm dependency thư viện vào trong project sbt:

libraryDependencies += "org.apache.kafka" %% "kafka" % "2.1.0"

Hãy xem đoạn code dưới đây, chúng ta sẽ đưa ra các bản tin cho topic “quick-start”

import java.util.Properties
import org.apache.kafka.clients.producer._

class Producer {

  def main(args: Array[String]): Unit = {
    writeToKafka("quick-start")
  }

  def writeToKafka(topic: String): Unit = {
    val props = new Properties()
    props.put("bootstrap.servers", "localhost:9094")
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
    val producer = new KafkaProducer[String, String](props)
    val record = new ProducerRecord[String, String](topic, "key", "value")
    producer.send(record)
    producer.close()
  }
}

Cùng lúc này, chúng ta có thể có consumer đang chạy và đang đăng ký với topic “quick-start” và hiển thị các bản tin

import java.util
import org.apache.kafka.clients.consumer.KafkaConsumer
import java.util.Properties
import scala.collection.JavaConverters._

class Consumer {

  def main(args: Array[String]): Unit = {
    consumeFromKafka("quick-start")
  }

  def consumeFromKafka(topic: String) = {
    val props = new Properties()
    props.put("bootstrap.servers", "localhost:9094")
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    props.put("auto.offset.reset", "latest")
    props.put("group.id", "consumer-group")
    val consumer: KafkaConsumer[String, String] = new KafkaConsumer[String, String](props)
    consumer.subscribe(util.Arrays.asList(topic))
    while (true) {
      val record = consumer.poll(1000).asScala
      for (data <- record.iterator)
        println(data.value())
    }
  }
}

 

You May Also Like

About the Author: Nguyen Dinh Thuc

Leave a Reply

Your email address will not be published.