Hầu hết tất cả các ngông ngữ lập trình hiện nay đều có cách diễn đạt Future – Promise sử dụng để lập trình concurrent. Bài viết này không phải mục đích để làm cho bạn cảm thẩy tẻ nhạt rằng tại sao chúng ta phải trừu tượng hoá concurrent ở mức cao hơn. Thay vào đó bài viết sẽ tập trung vào trình bày về cách tiếp cận của Scala với Future.
scala.concurrent.Future biểu diễn một kiểu, giá trị trong Scala. Nó thường là kết quả phép tính chạy đồng thời hoặc song song
Diễn tả một cách đơn giản, một đoạn code của chúng đang thực hiện một cách tuần tự, nhưng khi chúng ta đặt chúng trong Future thì nó sẽ thực hiện và chạy bất đồng bộ. Future sẽ xử lý giúp chúng ta, thông qua qua đó chúng ta có thể lấy được giá trị trả về sau khi đã được tính toán
Bài viết sẽ tập trung diễn tả cấu trúc cơ bản của Future và cách xử lý và lấy gía trị của nó bằng cách blocking wait hoặc callback.
Tạo một tính toán bất đồng bộ
Tạo một tính toán chạy bất đồng bộ bằng Future là tương đối đơn giản. Chúng ta chỉ việc đặt logic vào trong hàm apply của Future:
val aFuture: Future[Int] = Future { //Some massively huge super important computation }
Ví dụ, tạo một oneFuture trả về giá trị bằng 1 sau khi delay 1 giây
val oneFuture: Future[Int] = Future { Thread.sleep(1000) 1 }
Nhưng trước tiên chúng ta sẽ tìm hiểu một chút về hàm apply của Future
def apply[T](body: =>T)(implicit executor: ExecutionContext): scala.concurrent.Future[T]
Bên trên là thông tin về hàm apply và nó cho phép truyền tham số ẩn executor với kiểu là ExcutionContext bằng cách dùng implicit của Scala. Khi một đoạn code dùng để xây dựng một Future, các tính toán sẽ chạy trên một ExcutionContext. ExcutionContext là đồng nghĩa với một ThreadPool và khi một Future được bắt đầu, nó được chạy trên các thread khác nhau.
Scala đã cung cấp sẵn một static global ExecutionContext trong scala.concurrent.ExecutionContext.global mà chúng ta sẽ sử dụng nó ngay bây giờ trong bài viết này
Nên toàn bộ code sẽ giống như thế này:
class PromisingFutures{ import scala.concurrent.ExecutionContext.Implicits.global val oneFuture: Future[Int] = Future { Thread.sleep(1000) 1 } ... ...
Đoạn code bên trên sử dụng SCALA.CONCURRENT.EXECUTIONCONTEXT.GLOBAL global ExecutionContext rất là tiện khi sử dụng. Tuy nhiên, ThreadPool về cơ bản là một ForkJoinPool. Không có gì sai với điều này cả và ForkJoinPool là tuyệt vời cho các tính toán trực tiếp, ngắn nhưng nó không được khuyên dùng cho Blocking IO như Database hoặc gọi đến webservice (những tính toán mất nhiều thời gian hay đơn giản là những kết nối bên ngoài JVM).
Tin tốt là chúng ta vẫn có thể sử dụng Future và giải quyết một cách đơn giản bằng cách chia ra thành các ThreadPool mà không là ForkJoinPool, một ThreadPool không đổi ví dụ một lựa chọn tốt như sau:
implicit lazy val fixedThreadPoolExecutionContext: ExecutionContext = { val fixedThreadPool: ExecutorService = Executors.newFixedThreadPool(Runtime.getRuntime.availableProcessors * 2) //or some fixed number ExecutionContext.fromExecutor(fixedThreadPool) }
Giá trị và trạng thái của Future
Trước khi chúng ta tìm hiểu cách để lấy được giá trị của Future, thì hãy xem các Future thay đổi trạng thái và khi nào hoặc giá trị nào tương ứng với những trạng thái đó (state).
Future chỉ có 2 trạng thái đó là: Not Completed (không hoàn thành) và Completed (Hoàn thành).
Hãy xem hình bên dưới.
Khi chúng ta truyền vào một đoạn code logic để tính toán, kết quả trả về mà chúng ta có thể thu được từ Future là Option[Try[T]].
- Not Completed tức là kết quả tính toán vẫn chưa chạy xong và giá trị trả về vì thế sẽ là None
- Completed thì kết quả sẽ là Some(Try[T]), tức là kết quả chúng ta thu được sẽ như sau:
- Kết quả tính toàn được thực hiện thành công (Success)
- Có lỗi xảy ra và thu được một Exception
Chú ý rằng Try là một kiểu trong Scala nó có 2 giá trị trả về Success (thành công) và Failure ( lỗi xảy ra)
Ví dụ sau đây là một trường hợp trả về Success
def checkState(): Unit = { println("Before the job finishes") Thread.sleep(500) println(s"Completed : ${oneFuture.isCompleted}, Value : ${oneFuture.value}") println("After the job finishes") Thread.sleep(1100) println(s"Completed : ${oneFuture.isCompleted}, Value : ${oneFuture.value}") }
Như đoạn code ví dụ ở bên trên thì bạn biết rằn oneFuture sẽ bị delay 1 giây nên nó sẽ hoàn thành sau thời gian này. Hàm checkState sẽ delay 500ms trước nên sau thời điểm này chúng ta check trạng thái của oneFuture sẽ là Not Completed và giá trị của nó sẽ là None
Output for 500 ms: Before the job finishes Completed : false, Value : None
Sau khi delay 1 giây, đoạn code ở bên trên tiếp tục delay 1100 ms, sau đây kiểm tra lại trạng thái và value thì lúc này kết quả trả về thì trạng thái của nó sẽ là Completed và giá trị sẽ là một Some(Success(“result”)). Đơn giản là vì khi chúng ta kiểm tra trạng thái và giá trị của Future thì hàn oneFuture đã thực hiện xong
Output for 1100 ms: After the job finishes Completed : true, Value : Some(Success(1))
Bây giờ thì chúng ta đã hình dung được cách chạy Future rồi phải không, tiếp theo sẽ là cách để chúng ta có thể xử lấy việc lấy được các giá trị của nó
Lấy giá trị trả về của Future
Có 2 cách để lấy được giá trị trả về của Future khi nó thực hiện xong: Blocking wait và callback
1. Blocking wait sử dụng AWAIT.RESULT
Hàm scala.concurrent.Await có cú pháp như sau:
@throws(classOf[Exception]) def result[T](awaitable: Awaitable[T], atMost: Duration): T = blocking(awaitable.result(atMost)(AwaitPermission))
Tham số đầu vào của nó là một Awaitable Trait, cái mà Future kế thừa nên chúng ta có thể truyền vào một Future.
trait Future[+T] extends Awaitable[T]
Tham số đầu vào thứ 2 là atMost mà dùng để chỉ định rằng thời gian tối đa mà hàm có thể chờ một Future kết thúc chạy logic và trả về một giá trị. Nếu hết thời gian mà atMost được đặt mà chúng ta vần chưa thu được giá trị trả về thì nó sẽ throw một Exception là java.util.concurrent.TimeoutException
Nếu Future hoàn thành thì Await.result sẽ đưa ra một giá trị trả về mà chúng ta cần. Nếu Future hoàn thành nhưng kết quả trả về là môt Throwable, thì chúng ta thu được một Exception
Sử dụng Await.result trong production code thường gây chúng ta cảm giác thất vọng, nhưng kiến trúc này hết sức tiện lợi khi chúng ta viết test code cho Future
Chúng ra sẽ cùng thử 2 Future, một là oneFuture ( như ở trên) và một là oneDangerousFuture mà sẽ throw Exception
Code
Code sẽ được thực hiện như dưới đây:
val oneFuture: Future[Int] = Future { Thread.sleep(1000) 1 } val oneDangerousFuture=Future{ Thread.sleep(2000) throw new SomeComputationException("Welcome to the Dark side !") } case class SomeComputationException(msg: String) extends Exception(msg)
Test cases
Chúng ta sẽ có 3 test case như sau:
- Test case đầu tiên sẽ chạy với trường hợp thành công, tính toán sẽ chạy mất 1 giây và chúng ta sẽ sử dụng một duration (blocking tối đa) là 2 giây. Chúng ta chắc chắn sẽ thu được một kết quả Completed và trả về là thành công. Kết quả kiểm tra của test case này nên trả về là true
- Test case thứ 2 sẽ chạy với trường hợp Exception xảy ra. Hàm Future sẽ throw ra một SomeComputationException và chúng ta sẽ kiểm tra test case này xem nó có nhận được đúng Exception này không
- Test case cuối cùng, chúng ta sẽ sử dụng một duration (blocking tối đa) là 500ms trong khi logic tính toán của chúng ta kết thúc sau 1 giây. Do đó chúng ta sẽ thu được kết quả trả về của Await.result là một Exception: TimeoutException
class PromisingFutureTest extends FunSpec with Matchers { describe("A PromisingFuture") { it("should hold a Int value if the Await.result is called after the Future completes") { val promisingFuture = new PromisingFutures() val oneFuture = promisingFuture.oneFuture //Takes 1 second to compute val intValue = Await.result(oneFuture, 2 seconds) intValue should be(1) } it("should propagate the Exception to the callee if the computation threw an exception") { val promisingFuture = new PromisingFutures() val oneDangerousFuture = promisingFuture.oneDangerousFuture //throws exception intercept[SomeComputationException] { val intValue = Await.result(oneDangerousFuture, 2 seconds) } } it("should throw a TimeOutException exception when an Await.result's atMost parameter is lesser than the time taken for the Future to complete") { val promisingFuture = new PromisingFutures() val oneDelayedFuture = promisingFuture.oneFuture //Takes 1 second to compute intercept[TimeoutException] { Await.result(oneDelayedFuture, 500 millis) } } } }
Callback
Cách tốt nhất và rõ ràng để thu được giá trị của Future là dùng Callback. Có 3 kiểu Callback khác nhau trên một Future: onSuccess, onFailure và được kết hợp trong onComplete
- onSuccess callback sẽ chỉ được gọi khi mà Future hoàn thành và trả về giá trị thành công
- onFailure callback sẽ chỉ được gọi khi mà Future hoàn thành và giá trị trả về là một Exception
- onComplete là kết hợp của onSuccess và onFailure. Giá trị trả về của nó là một Try[T] bằng cách lấy từ một Option[Try[T]]
def onComplete[U](f: Try[T] => U)(implicit executor: ExecutionContext): Unit
Chú ý rằng tất cả các callback đều trả về môt Unit nghĩa rằng kết quả trả về của nó sẽ là nhiều kiểu khác nhau và là một side-effecting
Bây giờ chúng ta sẽ cùng tìm hiểu xem onComplete hoạt động như thế nào. Chúng ta sẽ có một hàm gọi là printFuture dùng để println ra giá trị của Future khi nó Completed. Sau đó hay truyền hai hàm oneFuture và oneDangerousFuture vào
class PromisingFutures { ... ... def printFuture[T](future: Future[T]): Unit = future.onComplete { case Success(result) => println(s"Success $result") case Failure(throwable) => println(s"Failure $throwable") } ...
object PromisingFutures{ def main(args: Array[String]) { val promisingFutures=new PromisingFutures promisingFutures.printFuture(promisingFutures.oneFuture) promisingFutures.printFuture(promisingFutures.oneDangerousFuture) synchronized(wait(3000)) } }
Kết quả
Success 1 Failure SomeComputationException: Welcome to the Dark side !
Như mong đợi thì oneFuture sẽ là Succes case và có giá trị là 1, trong khi oneDangerousFuture sẽ là Failure case và sẽ in ra một Exception