Akka stream – phần 1

Akka Stream là thư viện được sử dụng để xử lý và chuyển đổi luồng dữ liệu. Trong bài viết này chúng ta sẽ cùng tìm hiểu về các thành phần trong Akka Stream. Akka Stream là một implement của Reactive Stream và sử dụng không gian buffer hạn chế và tính chất này được biết như là boundedness

Để sử dụng Akka Stream, thêm dependency bên dưới đây vào trong build.sbt:

libraryDependencies += "com.typesafe.akka" %% "akka-stream" % "2.5.15"

Stream là một luồng dữ liệu mà liên quan đến việc di chuyển và chuyển đổi dữ liệu. Một phần tử là đơn vị xử lý của Stream. Akka Stream gồm có Source, Flow và Sink, chúng ta sẽ cùng nhau tìm hiểu rõ hơn ngay sau đây.

Source

Nó có đúng một đầu ra và chịu trách nhiệm tạo ra dữ liệu. Các phần tử được sinh ra bởi Source.

Có rất nhiều cách để  implement Source. Source có thể được implement sử dụng một iterator, range, future… Sau đây là ví dụ về một vài cách chúng ta có thể sử dụng để implement Source

//Tạo một source trống
val emptySource = Source.empty

//Source tạo các phần tử sử dụng range
val sourceUsingRange: Source[Int, NotUsed] = Source(1 to 100)

//Source tạo các phần tử từ list
val sourceUsingList: Source[Int, NotUsed] = Source(List(1, 2, 3))

//Source tạo các phần tử từ future
val sourceFromFuture: Source[Int, NotUsed] = Source.fromFuture(Future(2))

//Source tạo một phần tử đơn
val singleSource: Source[String, NotUsed] = Source.single("1")

//Source tạo các phần tử trên repeat
val repeatedSource: Source[Int, NotUsed] = Source.repeat(5)

Trong ví dụ bên trên, Source được tham số hoá với 2 giá trị. Tham số đầu tiên chỉ định kiểu dữ liệu Source sẽ sinh ra. Tham số còn lại xác định các thông tin phụ trợ mà Source tạo ra khi bắt đầu chạy nó. Trong ví dụ ở trên akka.NotUsed đã được sử dụng vì Source trong các trường hợp này không tạo ra bất kỳ thông tin nào.

Flow

Flow có một đầu vào và một đầu ra. Nó lấy dữ liệu từ Source và áp dụng một vài xử lý và chuyển đổi trên các phần tử và trả về các phần tử đã được xử lý cho Sink. Dữ liệu được tạo ra bởi Source có thể được chọn lọc, được chuyển đổi như bên dưới đây

val flowWithFilter = Flow[Int].filter(_ > 0)
val flowWithMap = Flow[Int].map(_.toString)

Sink

Sink có đúng một đầu vào nhận các phần tử từ Flow. Sink cũng được tham số hoá với 2 tham số. Tham số đầu tiên chỉ định kiểu dữ liệu mà Sink sẽ nhận và tham số còn lại định nghĩa kiểu của thông tin phụ trợ. Sink có nhiều hình thức khác nhau như bên dưới đây

//Sink thêm toàn bộ phần tử của stream sử dụng phương thức fold
val sinkAddingElements: Sink[Int, Future[Int]] = Sink.fold[Int, Int](0)(_ + _)

// Sink trả về một Future như giá trị cụ thể, chứa phần tử đầu tiên của stream
val sinkReturningFirstElement: Sink[Int, Future[Int]] = Sink.head

// Sink dùng một stream mà không làm bất cứ điều gì với các phần tử
val sinkIgnoringElements: Sink[Int, Future[Done]] = Sink.ignore

// Sink thực hiện một cuộc gọi side-effecting cho tất cả các phần tử của stream
val sinkPrintingElements: Sink[String, Future[Done]] = Sink.foreach[String](println(_))

Runnable graph

Điều này thể hiện các phần tử sẽ chạy qua các Source và các Sink khác nhau như thế nào. Chú ý rằng flow kết hợp với các kết quả của Source trong Source của chính nó, và flow kết hợp với một Sink hoạt động như một Sink. Một Runable graph nên có một Source và Sink đơn ở cả hai đầu như biểu diễn ở hình bên trên. Sau khi Runnable graph  tạo xong, nó sẵn sàng để chạy

implicit val system: ActorSystem = ActorSystem("akka-streams")
implicit val materializer: ActorMaterializer = ActorMaterializer()

val sourceProducingElements: Source[Int, NotUsed] = Source(1 to 100)
val flowTransformingData: Flow[Int, Int, NotUsed] = Flow[Int].filter(_ % 2 == 0).map(_ * 2)
val sinkPrintingElements: Sink[Int, Future[Done]] = Sink.foreach[Int](println(_))

val runnableGraph = source.via(flowTransformingData).toMat(sinkPrintingElements)(Keep.right)
runnableGraph.run()

Trong ví dụ bên trên, Source tạo ra các số interger từ 1 đến 100. Flow được tạo để giúp chọn lọc các số chẵn và  nhân đôi nó. Sau đấy có một Sink ( được gọi là sinkPrintingElements) thực hiện in các phần tử ra console. Kết nối giữa Source, Flow và Sink bằng cách sử dụng RunnableGraph[Int, Future[Done]]. Để thực hiện tạo Stream, phương thức run() được gọi thông qua runnableGraph. Runnable Graph  cũng có thể được tạo mà không cần sử dụng flow, nghĩa là Source và Sink là các thành phần cần thiết được sử dụng để chạy một Stream.

Cũng để ý rằng, stream yêu cầu sử dụng một materializer để thực thi. materializer có nhiệm vụ phân bổ các tài nguyên như là Actor để chạy Stream. ActorMaterializer có thể được cung cấp một biến implicit (biến ẩn) trong phạm vi hoặc biến rõ ràng khi chạy stream.

Chạy stream sử dụng Runnable graph là một cách để thực thi stream. Có một cách khác là sử dụng function có sẵn như phương thức runWith() mà nhận Sink như một tham số đầu vào.  Trong ví dụ bên trên, phương thức runForEach() đã được sử dụng để gọi phương thức runWith() trong implement của nó. Hãy xem ví dụ sau đây sử dụng cách tiếp cận này

source
.filter(_ % 2 == 0)
.map(_* 2)
.runForeach(println)

Một điểm cần lưu ý nữa là chương trình không bị dừng ngay cả khi toàn bộ stream đã được xử lý. Nguyên nhân là bởi ActorSystem bở vì ActorSystem sẽ không bao giờ bị chấm dứt.

Akka cung cấp một số các đặc điểm khác như là sử dụng lại code, xử lý dựa theo thời gian, back-pressure(backbone của Reactive Streams).

You May Also Like

About the Author: Nguyen Dinh Thuc

Leave a Reply

Your email address will not be published.