Akka Streams – phần 2

Trong bài viết trước, chúng ta đã cùng nhau tìm hiểu về Akka Stream và các thành phần của nó. Trong bài viết này chúng ta sẽ cùng nhau tìm hiểu về các tính năng của Akka Stream. Akka Stream là implement của Reactive Stream.

Sử dụng lại các phần

Trong rất nhiều API của Stream, chúng ta không thể sử dụng lại code cho Source, Flow và Sink. Nhưng thư viện Akka Stream cung cấp cho chúng ta dễ dàng để thực hiện điều này.

val source: Source[Int, NotUsed] = Source(1 to 10) 
source.filter(_ % 2 == 0).runForeach(println) 
source.filter(_ % 2 != 0).runForeach(println)

Trong ví dụ bên trên, Source đã được tạo và sử dụng cho 2 mẫu Stream. Không chỉ riêng source, các thành phần khác có thể được sử dụng lại. Giả sử có một Source đang tạo ra dữ liệu User. Sử dụng cùng một source, một tổ chức muốn tạo 2 stream để thực hiện chọn lọc người dùng căn cứ vào tuổi và chuyển đổi dữ liệu thành bytestring và sau đó ghi vào một file.

Đối với trường hợp này, chúng ta có thể viết một hàm để chuyển đổi user thành bytestring và ghi vào một file. Hàm này chuyển đổi một flow và ghi vào file có thể được sử dụng lại nhiều lần

case class User(id: Int, name: String, age: Int)
val users = List(
  User(1, "Aashrita", 22),
  User(2, "Mark", 12),
  User(3, "Ron", 19),
  User(4, "Adam", 20)
)

val userSource: Source[User, NotUsed] = Source(users)

def writeToFile(filename: String): Sink[User, Future[IOResult]] = {
  Flow[User]
   .map(_.toString)
   .map(ByteString(_))
   .toMat(FileIO.toPath(Paths.get(filename)))(Keep.right)
}

val g = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder =>
  import GraphDSL.Implicits._

  val bcast = builder.add(Broadcast[User](2))
  userSource ~> bcast.in
  bcast.out(0) ~> Flow[User].filter(_.age >= 18) ~> writeToFile("a.txt")
  bcast.out(1) ~> Flow[User].filter(_.age < 18) ~>  writeToFile("b.txt")
  ClosedShape
})
g.run()

Thời gian dựa trên xử lý

Akka Stream cho phép giới hạn flow của các phần tử trong một khoảng thời gian nhất định. Sử dụng phương thức throttle mà thư viện cung cấp, chúng ta có thể xác định được số các phần tử mà chúng ta muốn truyền trong khoảng thời gian nhất định. Trong ví dụ bên dưới đây, source đang sinh ra một phần tử trong 1 giây.

Source(1 to 10) 
  .throttle(1, 1.second) 
  .runForeach(println)

Back pressure

Nó là tính năng quan trọng nhất dựa trên Reactive Stream. Trước khi tìm hiểu backressure hoạt động như thế nào, hãy xem xét: Tại sao backpressure?

Giả sử source đang sinh ra dữ liệu với tốc độ rất cao và consumer chậm hơn và không thể xử lý dữ liệu với tốc độ nhanh như vậy. Để xử lý các kịch bản như vậy, consumer gửi các thông tin một cách bất đồng bộ và non-blocking đến publisher. Ngoài ra, publisher của stream ví dụ như source đảm bảo rằng nó sẽ không đưa ra dữ liệu nhiều hơn yêu cầu nhận của consumer. Trong những gói tin bất đồng bộ và non-blocking, consumer gửi tốc độ tiêu thụ tại đó nó có thể nhận các phần tử dữ liệu. Giao thức back-pressure đảm bảo rằng publisher sẽ không bao giờ báo hiệu nhiều phần tử hơn nhu cầu.

Thứ tự Stream

Hầu hết tất cả các tính toán trong Akka Stream đều giữ nguyên thứ tự đầu vào của stream. Giả sử A1, A2, A3… An, các phần tử được sinh ra bởi source. Sau khi stream truyền qua một giai đoạn xử lý, các phần tử được xử lý sẽ là B1, B2, B3…Bn. Các thứ tự sẽ được đảm bảo. Nếu Ai xảy ra trước Aj trong source đầu vào, thì các phần tử được xử lý Bi xuất hiện trước Bj sau khi truyền qua giai đoạn xử lý của stream.

Có rất nhiều lợi ích khác nữa khi sử dụng Akka Stream và thư viện này có thể được sử dụng cho một số các trường hợp sử dụng. Để hiểu rõ hơn, tham khảo tài liệu của Akka

You May Also Like

About the Author: Nguyen Dinh Thuc

Leave a Reply

Your email address will not be published.