Akka Stream là thư viện được sử dụng để xử lý và chuyển đổi luồng dữ liệu. Trong bài viết này chúng ta sẽ cùng tìm hiểu về các thành phần trong Akka Stream. Akka Stream là một implement của Reactive Stream và sử dụng không gian buffer hạn chế và tính chất này được biết như là boundedness
Để sử dụng Akka Stream, thêm dependency bên dưới đây vào trong build.sbt:
libraryDependencies += "com.typesafe.akka" %% "akka-stream" % "2.5.15"
Stream là một luồng dữ liệu mà liên quan đến việc di chuyển và chuyển đổi dữ liệu. Một phần tử là đơn vị xử lý của Stream. Akka Stream gồm có Source, Flow và Sink, chúng ta sẽ cùng nhau tìm hiểu rõ hơn ngay sau đây.
Source
Nó có đúng một đầu ra và chịu trách nhiệm tạo ra dữ liệu. Các phần tử được sinh ra bởi Source.
Có rất nhiều cách để implement Source. Source có thể được implement sử dụng một iterator, range, future… Sau đây là ví dụ về một vài cách chúng ta có thể sử dụng để implement Source
//Tạo một source trống val emptySource = Source.empty //Source tạo các phần tử sử dụng range val sourceUsingRange: Source[Int, NotUsed] = Source(1 to 100) //Source tạo các phần tử từ list val sourceUsingList: Source[Int, NotUsed] = Source(List(1, 2, 3)) //Source tạo các phần tử từ future val sourceFromFuture: Source[Int, NotUsed] = Source.fromFuture(Future(2)) //Source tạo một phần tử đơn val singleSource: Source[String, NotUsed] = Source.single("1") //Source tạo các phần tử trên repeat val repeatedSource: Source[Int, NotUsed] = Source.repeat(5)
Trong ví dụ bên trên, Source được tham số hoá với 2 giá trị. Tham số đầu tiên chỉ định kiểu dữ liệu Source sẽ sinh ra. Tham số còn lại xác định các thông tin phụ trợ mà Source tạo ra khi bắt đầu chạy nó. Trong ví dụ ở trên akka.NotUsed
đã được sử dụng vì Source trong các trường hợp này không tạo ra bất kỳ thông tin nào.
Flow
Flow có một đầu vào và một đầu ra. Nó lấy dữ liệu từ Source và áp dụng một vài xử lý và chuyển đổi trên các phần tử và trả về các phần tử đã được xử lý cho Sink. Dữ liệu được tạo ra bởi Source có thể được chọn lọc, được chuyển đổi như bên dưới đây
val flowWithFilter = Flow[Int].filter(_ > 0) val flowWithMap = Flow[Int].map(_.toString)
Sink
Sink có đúng một đầu vào nhận các phần tử từ Flow. Sink cũng được tham số hoá với 2 tham số. Tham số đầu tiên chỉ định kiểu dữ liệu mà Sink sẽ nhận và tham số còn lại định nghĩa kiểu của thông tin phụ trợ. Sink có nhiều hình thức khác nhau như bên dưới đây
//Sink thêm toàn bộ phần tử của stream sử dụng phương thức fold val sinkAddingElements: Sink[Int, Future[Int]] = Sink.fold[Int, Int](0)(_ + _) // Sink trả về một Future như giá trị cụ thể, chứa phần tử đầu tiên của stream val sinkReturningFirstElement: Sink[Int, Future[Int]] = Sink.head // Sink dùng một stream mà không làm bất cứ điều gì với các phần tử val sinkIgnoringElements: Sink[Int, Future[Done]] = Sink.ignore // Sink thực hiện một cuộc gọi side-effecting cho tất cả các phần tử của stream val sinkPrintingElements: Sink[String, Future[Done]] = Sink.foreach[String](println(_))
Runnable graph
Điều này thể hiện các phần tử sẽ chạy qua các Source và các Sink khác nhau như thế nào. Chú ý rằng flow kết hợp với các kết quả của Source trong Source của chính nó, và flow kết hợp với một Sink hoạt động như một Sink. Một Runable graph nên có một Source và Sink đơn ở cả hai đầu như biểu diễn ở hình bên trên. Sau khi Runnable graph tạo xong, nó sẵn sàng để chạy
implicit val system: ActorSystem = ActorSystem("akka-streams") implicit val materializer: ActorMaterializer = ActorMaterializer() val sourceProducingElements: Source[Int, NotUsed] = Source(1 to 100) val flowTransformingData: Flow[Int, Int, NotUsed] = Flow[Int].filter(_ % 2 == 0).map(_ * 2) val sinkPrintingElements: Sink[Int, Future[Done]] = Sink.foreach[Int](println(_)) val runnableGraph = source.via(flowTransformingData).toMat(sinkPrintingElements)(Keep.right) runnableGraph.run()
Trong ví dụ bên trên, Source tạo ra các số interger từ 1 đến 100. Flow được tạo để giúp chọn lọc các số chẵn và nhân đôi nó. Sau đấy có một Sink ( được gọi là sinkPrintingElements
) thực hiện in các phần tử ra console. Kết nối giữa Source, Flow và Sink bằng cách sử dụng RunnableGraph[Int, Future[Done]]
. Để thực hiện tạo Stream, phương thức run()
được gọi thông qua runnableGraph
. Runnable Graph cũng có thể được tạo mà không cần sử dụng flow, nghĩa là Source và Sink là các thành phần cần thiết được sử dụng để chạy một Stream.
Cũng để ý rằng, stream yêu cầu sử dụng một materializer
để thực thi. materializer
có nhiệm vụ phân bổ các tài nguyên như là Actor để chạy Stream. ActorMaterializer
có thể được cung cấp một biến implicit (biến ẩn) trong phạm vi hoặc biến rõ ràng khi chạy stream.
Chạy stream sử dụng Runnable graph là một cách để thực thi stream. Có một cách khác là sử dụng function có sẵn như phương thức runWith()
mà nhận Sink như một tham số đầu vào. Trong ví dụ bên trên, phương thức runForEach()
đã được sử dụng để gọi phương thức runWith()
trong implement của nó. Hãy xem ví dụ sau đây sử dụng cách tiếp cận này
source .filter(_ % 2 == 0) .map(_* 2) .runForeach(println)
Một điểm cần lưu ý nữa là chương trình không bị dừng ngay cả khi toàn bộ stream đã được xử lý. Nguyên nhân là bởi ActorSystem bở vì ActorSystem sẽ không bao giờ bị chấm dứt.
Akka cung cấp một số các đặc điểm khác như là sử dụng lại code, xử lý dựa theo thời gian, back-pressure(backbone của Reactive Streams).