Trong phần trước, chúng ta đã trình bày về Future và Promise. Trong phần này, chúng ta sẽ kết hợp các future sử dụng thế mạnh Combinator của nó
Kết hợp Combinator
Trong phần đầu tiên, chúng ta đã hiểu làm thế nào để lấy được giá trị từ Future sử dụng onComplete
, foreach
và trong test case sử dụng Await.result.
Nhờ đó, việc lấy giá trị từ một Future riêng lẻ là hoàn hảo nhưng trong nhiều trường hợp chúng ta cần xử lý lấy kết quả của nhiều tiến trình hạy bất đồng bộ cùng một lúc và đợi cho đến các Future chạy xong để thu được kết quả tổng hợp cuối cùng. Thậm chí, đôi khi kết qủa của một Future được cung cấp bởi một chuỗi hoặc một future khác.
Future là một Monad. Chúng ta có thể hiểu một cách đơn giản như sau:
- Future là một container chứa các giá trị với một số kiểu ( Nó chấp nhận một kiểu như tham số đầu vào và không thể tồn tại mà không có nó). Bạn có thể có
Future[Int]
hoặcFuture[String]
hoặcFuture[Class bất kỳ]
– Bạn không thể chỉ có một Future duy nhất. Một thuật ngữ liên tưởng đến chính là type-constructor. Để so sánh, List là một type constructor ( và cũng là một Monad). List là một container chứa các giá trị với kiểu là Int, String hoặc bất kỳ kiểu nào khác. Một List/ Future mà không chứa đựng một kiểu là không tồn tại. - Future có các hàm
flatmap
vàunit
(tất nhiên là cả hàmmap
)
Lý do tôi đưa ra điều này là thay vì sử dụng callback onComplete
hoặc foreach chúng ta đơn giản chỉ cần sử dụng map
hoặc flatmap
. Kết qủa của Future giống như chúng ta làm với một Option
hoặc một List
Nào chúng ta hãy cùng xem các Combinator: map và flatmap.
Thực hiện các Future tuần tự
Hãy thử chạy một nhiệm vụ đơn giản là thêm 3 số được tính toán bất đồng bộ sau một khoảng thời gian
Chú ý: Source code bên dưới hơi lộn xộn và các Future được chạy một cách tuần tự
Code
class FutureCombinators { def sumOfThreeNumbersSequentialMap(): Future[Int] = { Future { Thread.sleep(1000) 1 }.flatMap { oneValue => Future { Thread.sleep(2000) 2 }.flatMap { twoValue => Future { Thread.sleep(3000) 3 }.map { thirdValue => oneValue + twoValue + thirdValue } } } } ... ...
Hàm Future đầu tiên trả về 1 sau 1 giây, hàm thứ 2 trả về 2 sau 2 giây và hàm thứ 3 trả về 3 sau 3 giây. Đoạn code được lồng vào cuối cùng sẽ tính toán tổng của 3 giá trị và trả về một Future[Int]
Testcase
Để đo được thời gian tính toán các giá trị, chúng ta sẽ viết một hàm tiện ích đơn giản ( trong trait ConcurrentUtils
) được gọi để tính toán và in ra thời gian chạy của một đoạn code.
Chúng ta dùng hàm Await.result
để thực hiện chờ và lấy kết quả của futureCombinators.sumOfThreeNumbersSequentialMap
. Chúng ta sẽ tính toán tổng thời gian chạy và in ra gía trị
describe("Futures") { it("could be composed using map") { val futureCombinators = new FutureCombinators val result = timed(Await.result(futureCombinators.sumOfThreeNumbersSequentialMap(), 7 seconds)) result shouldBe 6 } } ... ... } trait ConcurrentUtils { def timed[T](block: => T): T = { val start = System.currentTimeMillis() val result = block val duration = System.currentTimeMillis() - start println(s"Time taken : $duration") result } }
Output
Time taken : 6049
Hàm mất 6 giây để thực hiện, nhờ đó chúng ta biết rằng các hàm Future được chạy một cách tuần tự
Thực hiện các Future song song
Các bạn đã nhìn thấy, đoạn code bên trên chạy 3 Future một cách tuần tự và vì thế mất tổng thời gian là 6 giây để hoàn thành các tính toán. Như vầy là không tối ưu đúng ko? Các Future cần được chạy song song. Mục đích để thực hiện được điều này, chúng ta cần khai báo tách biệt chúng ra và lấy giá trị của từng Future.
Code
val oneFuture: Future[Int] = Future { Thread.sleep(1000) 1 } val twoFuture: Future[Int] = Future { Thread.sleep(2000) 2 } val threeFuture: Future[Int] = Future { Thread.sleep(3000) 3 }
Tiếp theo, chúng ta sử dụng For – comprehension để tính toán giá trị
def sumOfThreeNumbersParallelMapForComprehension(): Future[Int] = for { oneValue <- oneFuture twoValue <- twoFuture threeValue <- threeFuture } yield oneValue + twoValue + threeValue
Testcase
Hãy tính toàn thời gian và xác định giá trị chính xác bằng cách sử dụng testcase sau đây:
describe("Futures that are executed in parallel") { it("could be composed using for comprehensions") { val futureCombinators = new FutureCombinators val result = timed(Await.result(futureCombinators.sumOfThreeNumbersParallel(), 4 seconds)) result shouldBe 6 } }
Output
Time taken : 3005
Như chúng ta nhìn thấy, hàm sumOfThreeNumbersParallel
mất gần như cùng lúc với thời gian của Future lâu nhất (threeFuture
) là 3 giây.
Để so sánh một cách rõ ràng hơn, đoạn code bên trên sẽ được viết lại mà không dùng hàm for-comprehension
như sau:
def sumOfThreeNumbersParallelMap(): Future[Int] = oneFuture.flatMap { oneValue => twoFuture.flatMap { twoValue => threeFuture.map { threeValue => oneValue + twoValue + threeValue } } }
Guard trong For-Comprehension
Giống như chúng ta thêm một Guard: Mệnh đề if trong For-comprehension
trên List
và các collection khác (aka hoặc các kiểu Monad), chúng ta có thể thêm các Guard cho Future như vậy. Một guard if sau đây sẽ kiểm tra điều kiện là giá trị được trả về bởi twoFuture là lớn hơn 1 như sau:
def sumOfThreeNumbersParallelWithGuard(): Future[Int] = for { oneValue <- oneFuture twoValue <- twoFuture if twoValue > 1 threeValue <- threeFuture } yield oneValue + twoValue + threeValue
Hãy cùng viết lại để hình dung thực tế nó chạy như thế nào (tất nhiên tôi đảm bảo rằng 90%, mọi người sẽ không muốn dùng cách này)
def sumOfThreeNumbersMapAndFlatMapWithFilter(): Future[Int] = oneFuture.flatMap { oneValue => twoFuture.withFilter(_ > 1).flatMap { twoValue => threeFuture.map { threeValue => oneValue + twoValue + threeValue } } }
Guard trong For-Comprehension – Trường hợp lỗi
Nếu Guard đánh giá false bằng cách tạo ra một Failure, thì một NoSuchElementException
sẽ được đưa ra. Chúng ta sẽ thử thay đổi điều kiện của Guard tìm một biểu thức là false
def sumOfThreeNumbersParallelWithGuardAndFailure(): Future[Int] = for { oneValue <- oneFuture twoValue <- twoFuture if twoValue > 2 threeValue <- threeFuture } yield oneValue + twoValue + threeValue
Output
Future.filter predicate is not satisfied java.util.NoSuchElementException: Future.filter predicate is not satisfied at scala.concurrent.Future$$anonfun$filter$1.apply(Future.scala:280) at scala.util.Success$$anonfun$map$1.apply(Try.scala:237) at scala.util.Try$.apply(Try.scala:192) at scala.util.Success.map(Try.scala:237) at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:237) at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:237) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) at scala.concurrent.impl.ExecutionContextImpl$AdaptedForkJoinTask.exec(ExecutionContextImpl.scala:121) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Kiểm soát Exception
Giống như NoSuchElementException
được đưa ra bởi guard, code được chạy bất đồng bộ trong Future cũng có thể đưa ra một Exception trong nhiều trường hợp khác nhau. Trong khi người ta tranh luận rằng Exception là rất không giống như FP, may mắn với một ứng dụng được phân tán hoặc thông qua sử dụng thư viên Java bên trong Future, Exception được xảy ra.
Code
Cả 2 hàm sau đây đều đưa ra Exception – Hàm đầu tiên đưa ra một NoSuchElementException
, còn hàm thứ 2 đưa ra một LegacyException
//NoSuchElementException def throwsNoSuchElementIfGuardFails(): Future[Int] = for { oneValue <- oneFuture twoValue <- twoFuture if twoValue > 2 threeValue <- threeFuture } yield oneValue + twoValue + threeValue //LegacyException val futureCallingLegacyCode: Future[Int] = Future { Thread.sleep(1000) throw new LegacyException("Danger! Danger!") } def throwsExceptionFromComputation(): Future[Int] = for { oneValue <- oneFuture futureThrowingException <- futureCallingLegacyCode } yield oneValue + futureThrowingException case class LegacyException(msg: String) extends Exception(msg)
Testcase
describe("Futures that throw exception") { it("could blow up on the caller code when guard fails") { val futureCombinators = new FutureCombinators intercept[NoSuchElementException] { val result = timed(Await.result(futureCombinators.throwsNoSuchElementIfGuardFails(), 4 seconds)) } } it("could blow up on the caller code when exception comes from a computation executed inside the Future") { val futureCombinators = new FutureCombinators intercept[LegacyException] { val result = timed(Await.result(futureCombinators.throwsExceptionFromComputation(), 4 seconds)) } } ... ...
chú ý rằng nếu một kết quả của các Future đưa ra một Exception, thì toàn bộ kết quả tính toán sẽ đưa ra kết quả là exception
Recover từ Exception
Sử dụng Recover
Nếu một Future đưa ra một Exception như là scala.util.control.NonFatal
và chúng ta muốn kiểm soát điều này hoặc muốn thay đổi thông tin về lỗi rõ ràng hơn cho bên gọi, chúng ta có thể sử dụng hàm recover
. recover
là giống như catch
Nào chúng ta sẽ cùng thử thay đổi hàm throwsExceptionFromComputation
mà đang đưa ra exception là LegacyException
. Hàm recover sẽ nhận một PartialFunction
mà ánh xạ từ Throwable
đến kiểu mà Future đang được chỉ định.
Code
Hãy xem đoạn code bên dưới đây, nếu futureCallingLegacyCode
đưa ra một Exception, thì giá trị của kết quả tính toán sẽ được gán lại bằng 200. Nếu không có lỗi xảy ra ( không bị Exception) thì nó sẽ lấy giá trị được tính toán bởi hàm
val futureCallingLegacyCodeWithRecover: Future[Int] = futureCallingLegacyCode.recover { case LegacyException(msg) => 200 } def recoversFromExceptionUsingRecover(): Future[Int] = for { oneValue <- oneFuture futureThrowingException <- futureCallingLegacyCodeWithRecover } yield oneValue + futureThrowingException
Nên nhớ rằng, nếu Future ban đầu đưa ra một giá trị thành công, thì đoạn code recover
sẽ không bao giờ được chạy qua. Tương tự thì nếu PartialFunction
bên trong hàm recover
không vào trường hợp mà chúng ta mong muốn xử lý Exception lúc đầu (không vào trường hợp LegacyException
), các Exception thu được sẽ trả luôn về cho phía gọi.
Testcase
Testcase giúp xác nhận rằng giá trị tính toán bằng tổng các giá trị được trả về bởi oneFuture
( bằng 1) và futureCallingLegacyCodeWithRecover
(bằng 200)
it("could be recovered with a recovery value") { val futureCombinators = new FutureCombinators val result = timed(Await.result(futureCombinators.recoversFromExceptionUsingRecover(), 2 seconds)) result shouldBe 201 }
Output
Time taken : 1004
Sử dụng recoverwith
Thay vì sử dụng recover để lấy giá trị khi kết quả trả về của Future là một Exception, chúng ta có thể muốn lấy các Exception của các Future khác trong một vài trường hợp. Có thể hiểu như sau, khi cúng ta không thể gọi kết nối HTTP đến Server1 do lỗi mạng thì chúng tha có thể khắc phục (recover) bằng cách (with) chuyển kết nối HTTP sang Server2
Giống như recover thì recoverwith nhận một PartialFunction
. Tuy nhiên PartialFunction
ánh xạ một Throwable
với Future có kiểu giống với Future ban đầu
Giống như recover, nếu Future chính mà recoverwith được gọi khi lỗi xảy ra, thì Future được ánh xạ đến PartialFunction
được gọi. Nếu Future thứ hai trả về giá trị thành công, thì một kết quả mới được trả về.
Code
val futureCallingLegacyCodeWithRecoverWith: Future[Int] = futureCallingLegacyCode.recoverWith { case LegacyException(msg) => println("Exception occurred. Recovering with a Future that wraps 1000") Thread.sleep(2000) Future(1000) } def recoversFromExceptionUsingRecoverWith(): Future[Int] = for { oneValue <- oneFuture futureThrowingException <- futureCallingLegacyCodeWithRecoverWith } yield oneValue + futureThrowingException
Testcase
oneFuture
mất 1 giây, recover Future thì mất 2 giây. Nên chúng ta đặt một Await.result
timeout là 4 giây. Kết quả cuối cùng sẽ là 1001 bằng tổng của oneFuture
và futureCallingLegacyCodeWithRecoverWith
it("could be recovered with a recovery Future") { val futureCombinators = new FutureCombinators val result = timed(Await.result(futureCombinators.recoversFromExceptionUsingRecoverWith(), 4 seconds)) result shouldBe 1001 }
Output
Time taken : 3006
Chú ý rằng giống như recover, nếu Future thứ 2 bị lỗi thì, thì sẽ đưa ra một Exception và trả về cho phía gói
Code
Trong code sau đây, chúng ta tạo một hàm Future khác mà đưa ra Exception với nội dung là “Dieded!! “. và chúng ta recover Future đầu tiên với error-throwing-Future. Testcase sẽ kiểm tra Exception từ Future thứ 2 (recover 1 lần) bắt được và Throw lại về phía bên gọi
val anotherErrorThrowingFuture: Future[Int] = Future { Thread.sleep(1000) throw new LegacyException("Dieded!!") } val futureRecoveringWithAnotherErrorThrowingFuture: Future[Int] = futureCallingLegacyCode.recoverWith { case LegacyException(msg) => anotherErrorThrowingFuture } def recoversFromExceptionUsingRecoverWithThatFails(): Future[Int] = for { oneValue <- oneFuture futureThrowingException <- futureRecoveringWithAnotherErrorThrowingFuture } yield oneValue + futureThrowingException
Testcase
it("when recovered with another Future that throws Exception would throw the error from the second Future") { val futureCombinators = new FutureCombinators val exception = intercept[LegacyException] { timed(Await.result(futureCombinators.recoversFromExceptionUsingRecoverWithThatFails(), 4 seconds)) } exception.msg shouldBe "Dieded!!" }
Sử dụng fallbackTo
fallbackTo
hoạt động giống như recoverwith
khi nói đến giá trị trả về thành công. Nó sử dụng giá trị Future đầu tiên nếu thành công hoặc chuyển sang giá trị của Future thứ 2. Tuy nhiên, nếu cả 2 Future đều lỗi , thì lỗi sẽ được trả về phía bên gọi nhưng lỗi từ Future đầu tiên chứ không phải Future thứ 2
Code
Chúng ta sẽ dử dụng lại Future mà chúng ta dùng với recoverwith
val futureFallingBackToAnotherErrorThrowingFuture: Future[Int] = futureCallingLegacyCode.fallbackTo (anotherErrorThrowingFuture) def recoversFromExceptionUsingFallbackTo(): Future[Int] = for { oneValue <- oneFuture futureThrowingException <- futureFallingBackToAnotherErrorThrowingFuture } yield oneValue + futureThrowingException
Chú ý rằng fallbackTo
chỉ chấp nhập Future khác và không phải là PartialFunction
giống như recoverwith
Testcase
it("when fallen back to another Future that throws Exception would throw the error from the first Future") { val futureCombinators = new FutureCombinators val exception = intercept[LegacyException] { timed(Await.result(futureCombinators.recoversFromExceptionUsingFallbackTo(), 4 seconds)) } exception.msg shouldBe "Danger! Danger!" }
Combinator hữu ích và thú vị khác
Sau đây là danh sách ngắn gọn của các Combinator Future khác mà rất là hữu ích
Zip
zip hoạt đọng giống như List.zip
. Nó giúp hợp 2 Future lại và tạo ra một Future kiểu Tuple
def zipTwoFutures:Future[(Int,Int)]=oneFuture zip twoFuture
firstCompleteOf
firstCompleteOf
thực sự hữu ích khi bạn có 2 service tương đương và bạn muốn tiếp tục khi service nhanh nhất trả về một giá trị
val listOfFutures = List(oneFuture,twoFuture,threeFuture) def getFirstResult():Future[Int]= Future.firstCompletedOf(listOfFutures)
Trong trường hợp trên thì oneFuture
sẽ trả về giá trị nhanh nhất
sequence
sequence
là hơi ảo. Khi bạn có một List[Future[Int]]
giống như List(oneFuture,twoFuture,threeFuture)
và bạn mong muốn rằng tất cả giá trị trả lại sẽ là một List[Int]
thay cho mỗi Int
được đặt trong Future. sequence
lấy List[Future[Int]]
và biến đổi nó thành Future[List[Int]]
def getResultsAsList():Future[List[Int]] = Future.sequence(listOfFutures)
Scala-async library
Scala-async library là một project bên ngoài nhưng có thể được thêm vào dự án bằng cách thêm dependency
vào build.sbt
org.scala-lang.modules" %% "scala-async" % "0.9.6-RC2
async thư viện có 2 hàm hữu ích trong package scala.async.Async
– async
và await
async
Hàm async
rất giống với hàm Future.apply
. Trong thực tế việc sử dụng chúng rất giống nhau và chúng ta có thể thoải mái thay Future.apply
bằng async bất cứ khi nào dùng
Future.apply
def apply[T](body: =>T)(implicit executor: ExecutionContext): Future[T]
async
def async[T](body: => T)(implicit execContext: ExecutionContext): Future[T]
Ưu điểm hàng đầu của việc sử dụng async
thay cho Future.apply
, ngoài khả năng dễ đọc ra là tạo ra mỗi Future khi sử dụng For-comprehense, Compiler sẽ đưa ra các anonymous class tách rời trong khi với async nó chỉ có một anonymous class duy nhất. Do đó, chúng ta có thể viết lại hàm oneFuture
như sau:
val oneFuture: Future[Int] = async { Thread.sleep(1000) 1 }
await
Hàm await
nhận một Future và trả về kết quả. Nhưng không giống như Await.result
đưa vào một Future và cũng trả về một kết quả ? Không, điểm khác biệt quan trọng là Await.result
là blocking và không nên sử dụng trên Production code ngoại trừ testcode. Hàm await
, được thực hiện bằng cách sử dụng macro Scala và hoàn thành là nó trả về kết quả của Future bằng cách sử dụng callback onComplete
Code
Nào chúng ta sẽ thử viết lại hàm sum-of-three-numbers
dùng async và await
def sumOfThreeNumbersParallelWithAsyncAwait(): Future[Int] = async { await(oneFuture) + await(twoFuture) + await(threeFuture) }
Testcase
it("could be composed using async/await") { val futureCombinators = new FutureCombinators val result = timed(Await.result(futureCombinators.sumOfThreeNumbersParallelWithAsyncAwait(), 4 seconds)) result shouldBe 6 }
Như chúng ta đã thấy, code được viết theo cách này không chỉ là chạy bất đồng bộ mà trông cũng tự nhiên. Chúng ta cũng thấy rõ rằng for-comprehense là bước nhảy vọt lớn từ việc sử dụng map, flatmap nhưng async/await lại tiến thêm một bước lớn nữa