Scala Future – Combinator và Async

Trong phần trước, chúng ta đã trình bày về Future và Promise. Trong phần này, chúng ta sẽ kết hợp các future sử dụng thế mạnh Combinator của nó

Kết hợp Combinator

Trong phần đầu tiên, chúng ta đã hiểu làm thế nào để lấy được giá trị từ Future sử dụng onComplete, foreach và trong test case sử dụng Await.result. Nhờ đó, việc lấy giá trị từ một Future riêng lẻ là hoàn hảo nhưng trong nhiều trường hợp chúng ta cần xử lý lấy kết quả của nhiều tiến trình hạy bất đồng bộ cùng một lúc và đợi cho đến các Future chạy xong để thu được kết quả tổng hợp cuối cùng. Thậm chí, đôi khi kết qủa của một Future được cung cấp bởi một chuỗi hoặc một future khác.

Future là một Monad. Chúng ta có thể hiểu một cách đơn giản như sau:

  1. Future là một container chứa các giá trị với một số kiểu ( Nó chấp nhận một kiểu như tham số đầu vào và không thể tồn tại mà không có nó). Bạn có thể có Future[Int] hoặc Future[String] hoặc Future[Class bất kỳ] – Bạn không thể chỉ có một Future duy nhất. Một thuật ngữ liên tưởng đến chính là type-constructor.  Để so sánh, List là một type constructor ( và cũng là một Monad). List là một container chứa các giá trị với kiểu là Int, String hoặc bất kỳ kiểu nào khác. Một List/ Future mà không chứa đựng một kiểu là không tồn tại.
  2. Future có các hàm flatmapunit (tất nhiên là cả hàm map)

Lý do tôi đưa ra điều này là thay vì sử dụng callback onComplete hoặc foreach chúng ta đơn giản chỉ cần sử dụng map hoặc flatmap. Kết qủa của Future giống như chúng ta làm với một Option hoặc một List

Nào chúng ta hãy cùng xem các Combinator: map và flatmap.

Thực hiện các Future tuần tự

Hãy thử chạy một nhiệm vụ đơn giản là thêm 3 số được tính toán bất đồng bộ sau một khoảng thời gian

Chú ý: Source code bên dưới hơi lộn xộn và các Future được chạy một cách tuần tự

Code

class FutureCombinators {
  def sumOfThreeNumbersSequentialMap(): Future[Int] = {
    Future {
      Thread.sleep(1000)
      1
    }.flatMap { oneValue =>
      Future {
        Thread.sleep(2000)
        2
      }.flatMap { twoValue =>
        Future {
          Thread.sleep(3000)
          3
        }.map { thirdValue =>
          oneValue + twoValue + thirdValue
        }
      }
    }
  }
...
...

Hàm Future đầu tiên trả về 1 sau 1 giây, hàm thứ 2 trả về 2 sau 2 giây và hàm thứ 3 trả về 3 sau 3 giây. Đoạn code được lồng vào cuối cùng sẽ tính toán tổng của 3 giá trị và trả về một Future[Int]

Testcase

Để đo được thời gian tính toán các giá trị, chúng ta sẽ viết một hàm tiện ích đơn giản ( trong  trait ConcurrentUtils ) được gọi để tính toán và in ra thời gian chạy của một đoạn code.

Chúng ta dùng hàm Await.result để thực hiện chờ và lấy kết quả của futureCombinators.sumOfThreeNumbersSequentialMap.  Chúng ta sẽ tính toán tổng thời gian chạy và in ra gía trị

describe("Futures") {
    it("could be composed using map") {
      val futureCombinators = new FutureCombinators
      val result = timed(Await.result(futureCombinators.sumOfThreeNumbersSequentialMap(), 7 seconds))
      result shouldBe 6
    }
  }
...
...
}

trait ConcurrentUtils {  
  def timed[T](block: => T): T = {
    val start = System.currentTimeMillis()
    val result = block
    val duration = System.currentTimeMillis() - start
    println(s"Time taken : $duration")
    result
  }
}

Output

Time taken : 6049

Hàm mất 6 giây để thực hiện, nhờ đó chúng ta biết rằng các hàm Future được chạy một cách tuần tự

Thực hiện các Future song song

Các bạn đã nhìn thấy, đoạn code bên trên chạy 3 Future một cách tuần tự và vì thế mất tổng thời gian là 6 giây để hoàn thành các tính toán. Như vầy là không tối ưu đúng ko? Các Future cần được chạy song song. Mục đích để thực hiện được điều này, chúng ta cần khai báo tách biệt chúng ra và lấy giá trị của từng Future.

Code

val oneFuture: Future[Int] = Future {
    Thread.sleep(1000)
    1
}

val twoFuture: Future[Int] = Future {
    Thread.sleep(2000)
    2
}

val threeFuture: Future[Int] = Future {
    Thread.sleep(3000)
    3
}

Tiếp theo, chúng ta sử dụng For – comprehension để tính toán giá trị

def sumOfThreeNumbersParallelMapForComprehension(): Future[Int] = for {  
    oneValue <- oneFuture
    twoValue <- twoFuture
    threeValue <- threeFuture
} yield oneValue + twoValue + threeValue

Testcase

Hãy tính toàn thời gian và xác định giá trị chính xác bằng cách sử dụng testcase sau đây:

describe("Futures that are executed in parallel") {
   it("could be composed using for comprehensions") {
      val futureCombinators = new FutureCombinators
      val result = timed(Await.result(futureCombinators.sumOfThreeNumbersParallel(), 4 seconds))
      result shouldBe 6
   }
}

Output

Time taken : 3005

Như chúng ta nhìn thấy, hàm sumOfThreeNumbersParallel mất gần như cùng lúc với thời gian của Future lâu nhất (threeFuture) là 3 giây.

Để so sánh một cách rõ ràng hơn, đoạn code bên trên sẽ được viết lại mà không dùng hàm for-comprehension như sau:

def sumOfThreeNumbersParallelMap(): Future[Int] = oneFuture.flatMap { oneValue =>  
    twoFuture.flatMap { twoValue =>
      threeFuture.map { threeValue =>
        oneValue + twoValue + threeValue
      }
    }
}

Guard trong For-Comprehension

Giống như chúng ta thêm một Guard: Mệnh đề if trong For-comprehensiontrên List và các collection khác (aka hoặc các kiểu Monad), chúng ta có thể thêm các Guard cho Future như vậy. Một guard if sau đây sẽ kiểm tra điều kiện là giá trị được trả về bởi twoFuture là lớn hơn 1 như sau:

def sumOfThreeNumbersParallelWithGuard(): Future[Int] = for {
    oneValue <- oneFuture
    twoValue <- twoFuture if twoValue > 1
    threeValue <- threeFuture
  } yield oneValue + twoValue + threeValue

Hãy cùng viết lại để hình dung thực tế nó chạy như thế nào (tất nhiên tôi đảm bảo rằng 90%, mọi người sẽ không muốn dùng cách này)

def sumOfThreeNumbersMapAndFlatMapWithFilter(): Future[Int] = oneFuture.flatMap { oneValue =>  
    twoFuture.withFilter(_ > 1).flatMap { twoValue =>
      threeFuture.map { threeValue =>
        oneValue + twoValue + threeValue
      }
    }
  }

Guard trong For-Comprehension – Trường hợp lỗi

Nếu Guard đánh giá false bằng cách tạo ra một Failure, thì một NoSuchElementException sẽ được đưa ra. Chúng ta sẽ thử thay đổi điều kiện của Guard tìm một biểu thức là false

def sumOfThreeNumbersParallelWithGuardAndFailure(): Future[Int] = for {
    oneValue <- oneFuture
    twoValue <- twoFuture if twoValue > 2
    threeValue <- threeFuture
  } yield oneValue + twoValue + threeValue

Output

Future.filter predicate is not satisfied  
java.util.NoSuchElementException: Future.filter predicate is not satisfied  
    at scala.concurrent.Future$$anonfun$filter$1.apply(Future.scala:280)
    at scala.util.Success$$anonfun$map$1.apply(Try.scala:237)
    at scala.util.Try$.apply(Try.scala:192)
    at scala.util.Success.map(Try.scala:237)
    at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:237)
    at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:237)
    at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
    at scala.concurrent.impl.ExecutionContextImpl$AdaptedForkJoinTask.exec(ExecutionContextImpl.scala:121)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

Kiểm soát Exception

Giống như NoSuchElementException được đưa ra bởi guard,  code được chạy bất đồng bộ trong Future cũng có thể đưa ra một Exception trong nhiều trường hợp khác nhau. Trong khi người ta tranh luận rằng Exception là rất không giống như FP, may mắn với một ứng dụng được phân tán hoặc thông qua sử dụng thư viên Java bên trong Future, Exception được xảy ra.

Code

Cả 2 hàm sau đây đều đưa ra Exception – Hàm đầu tiên đưa ra một NoSuchElementException, còn hàm thứ 2 đưa ra một LegacyException

//NoSuchElementException
  def throwsNoSuchElementIfGuardFails(): Future[Int] = for {
    oneValue <- oneFuture
    twoValue <- twoFuture if twoValue > 2
    threeValue <- threeFuture
  } yield oneValue + twoValue + threeValue

//LegacyException

  val futureCallingLegacyCode: Future[Int] = Future {
    Thread.sleep(1000)
    throw new LegacyException("Danger! Danger!")
  }

  def throwsExceptionFromComputation(): Future[Int] = for {
    oneValue <- oneFuture
    futureThrowingException <- futureCallingLegacyCode
  } yield oneValue + futureThrowingException

case class LegacyException(msg: String) extends Exception(msg)

Testcase

describe("Futures that throw exception") {
    it("could blow up on the caller code when guard fails") {
      val futureCombinators = new FutureCombinators
      intercept[NoSuchElementException] {
        val result = timed(Await.result(futureCombinators.throwsNoSuchElementIfGuardFails(), 4 seconds))
      }
    }

    it("could blow up on the caller code when exception comes from a computation executed inside the Future") {
      val futureCombinators = new FutureCombinators
      intercept[LegacyException] {
        val result = timed(Await.result(futureCombinators.throwsExceptionFromComputation(), 4 seconds))
      }
    }
...
...

chú ý rằng nếu một kết quả của các Future đưa ra một Exception, thì toàn bộ kết quả tính toán sẽ đưa ra kết quả là exception

Recover từ Exception

Sử dụng Recover

Nếu một Future đưa ra một Exception như là scala.util.control.NonFatalvà chúng ta muốn kiểm soát điều này hoặc muốn thay đổi thông tin về lỗi rõ ràng hơn cho bên gọi, chúng ta có thể sử dụng hàm recover.  recover là giống như catch

Nào chúng ta sẽ cùng thử thay đổi hàm throwsExceptionFromComputation mà đang đưa ra exception là LegacyException. Hàm recover sẽ nhận một PartialFunction mà ánh xạ từ Throwable đến kiểu mà Future đang được chỉ định.

Code

Hãy xem đoạn code bên dưới đây, nếu futureCallingLegacyCode đưa ra một Exception, thì giá trị của kết quả tính toán sẽ được gán lại bằng 200. Nếu không có lỗi xảy ra ( không bị Exception) thì nó sẽ lấy giá trị được tính toán bởi hàm

val futureCallingLegacyCodeWithRecover: Future[Int] = futureCallingLegacyCode.recover {
    case LegacyException(msg) => 200
  }

  def recoversFromExceptionUsingRecover(): Future[Int] = for {
    oneValue <- oneFuture
    futureThrowingException <- futureCallingLegacyCodeWithRecover
  } yield oneValue + futureThrowingException

Nên nhớ rằng, nếu Future ban đầu đưa ra một giá trị thành công, thì đoạn code recover sẽ không bao giờ được chạy qua.  Tương tự thì nếu PartialFunctionbên trong hàm recoverkhông vào trường hợp mà chúng ta mong muốn xử lý Exception lúc đầu (không vào trường hợp  LegacyException), các Exception thu được sẽ trả luôn về cho phía gọi.

Testcase

Testcase giúp xác nhận rằng giá trị tính toán bằng tổng các giá trị được trả về bởi oneFuture ( bằng 1) và futureCallingLegacyCodeWithRecover (bằng 200)

it("could be recovered with a recovery value") {
      val futureCombinators = new FutureCombinators
      val result = timed(Await.result(futureCombinators.recoversFromExceptionUsingRecover(), 2 seconds))
      result shouldBe 201
    }

Output

Time taken : 1004

Sử dụng recoverwith

Thay vì sử dụng recover để lấy giá trị khi kết quả trả về của Future là một Exception, chúng ta có thể muốn lấy các Exception của các Future khác trong một vài trường hợp. Có thể hiểu như sau, khi cúng ta không thể gọi kết nối HTTP đến Server1 do lỗi mạng thì chúng tha có thể khắc phục (recover) bằng cách (with) chuyển kết nối HTTP sang Server2

Giống như recover thì recoverwith nhận một PartialFunction. Tuy nhiên PartialFunction ánh xạ một Throwable với Future có kiểu giống với Future ban đầu

Giống như recover, nếu Future chính mà recoverwith được gọi khi lỗi xảy ra, thì Future được ánh xạ đến PartialFunctionđược gọi. Nếu Future thứ hai trả về giá trị thành công, thì một kết quả mới được trả về.

Code

val futureCallingLegacyCodeWithRecoverWith: Future[Int] = futureCallingLegacyCode.recoverWith {
    case LegacyException(msg) =>
      println("Exception occurred. Recovering with a Future that wraps 1000")
      Thread.sleep(2000)
      Future(1000)
  }

  def recoversFromExceptionUsingRecoverWith(): Future[Int] = for {
    oneValue <- oneFuture
    futureThrowingException <- futureCallingLegacyCodeWithRecoverWith
  } yield oneValue + futureThrowingException

Testcase

oneFuture mất 1 giây, recover Future thì mất 2 giây. Nên chúng ta đặt một Await.result timeout là 4 giây. Kết quả cuối cùng sẽ là 1001 bằng tổng của oneFuture và futureCallingLegacyCodeWithRecoverWith

it("could be recovered with a recovery Future") {
      val futureCombinators = new FutureCombinators
      val result = timed(Await.result(futureCombinators.recoversFromExceptionUsingRecoverWith(), 4 seconds))
      result shouldBe 1001
    }

Output

Time taken : 3006

Chú ý rằng giống như recover, nếu Future thứ 2 bị lỗi thì, thì sẽ đưa ra một Exception và trả về cho phía gói 

Code

Trong code sau đây, chúng ta tạo một hàm Future khác mà đưa ra Exception với nội dung là “Dieded!! “. và chúng ta recover Future đầu tiên với error-throwing-Future. Testcase sẽ kiểm tra Exception từ Future thứ 2 (recover 1 lần) bắt được và Throw lại về phía bên gọi

val anotherErrorThrowingFuture: Future[Int] = Future {
    Thread.sleep(1000)
    throw new LegacyException("Dieded!!")
  }

  val futureRecoveringWithAnotherErrorThrowingFuture: Future[Int] = futureCallingLegacyCode.recoverWith {
    case LegacyException(msg) =>
      anotherErrorThrowingFuture
  }

  def recoversFromExceptionUsingRecoverWithThatFails(): Future[Int] = for {
    oneValue <- oneFuture
    futureThrowingException <- futureRecoveringWithAnotherErrorThrowingFuture
  } yield oneValue + futureThrowingException

Testcase

it("when recovered with another Future that throws Exception would throw the error from the second Future") {
      val futureCombinators = new FutureCombinators
      val exception = intercept[LegacyException] {
        timed(Await.result(futureCombinators.recoversFromExceptionUsingRecoverWithThatFails(), 4 seconds))
      }
      exception.msg shouldBe "Dieded!!"
    }

Sử dụng fallbackTo

fallbackTo hoạt động giống như recoverwith khi nói đến giá trị trả về thành công. Nó sử dụng giá trị Future đầu tiên nếu thành công hoặc chuyển sang giá trị của Future thứ 2. Tuy nhiên, nếu cả 2 Future đều lỗi , thì lỗi sẽ được trả về phía bên gọi nhưng lỗi từ Future đầu tiên chứ không phải Future thứ 2

Code

Chúng ta sẽ dử dụng lại Future mà chúng ta dùng với recoverwith

val futureFallingBackToAnotherErrorThrowingFuture: Future[Int] = futureCallingLegacyCode.fallbackTo (anotherErrorThrowingFuture)

def recoversFromExceptionUsingFallbackTo(): Future[Int] = for {
    oneValue <- oneFuture
    futureThrowingException <- futureFallingBackToAnotherErrorThrowingFuture
} yield oneValue + futureThrowingException

Chú ý rằng fallbackTo chỉ chấp nhập Future khác và không phải là PartialFunctiongiống như recoverwith

Testcase

it("when fallen back to another Future that throws Exception would throw the error from the first Future") {
      val futureCombinators = new FutureCombinators
      val exception = intercept[LegacyException] {
        timed(Await.result(futureCombinators.recoversFromExceptionUsingFallbackTo(), 4 seconds))
      }
      exception.msg shouldBe "Danger! Danger!"
}

Combinator hữu ích và thú vị khác

Sau đây là danh sách ngắn gọn của các Combinator Future khác mà rất là hữu ích

Zip

zip hoạt đọng giống như List.zip. Nó giúp hợp 2 Future lại và tạo ra một Future kiểu Tuple

def zipTwoFutures:Future[(Int,Int)]=oneFuture zip twoFuture

firstCompleteOf

firstCompleteOf thực sự hữu ích khi bạn có 2 service tương đương và bạn muốn tiếp tục khi service nhanh nhất trả về một giá trị

val listOfFutures = List(oneFuture,twoFuture,threeFuture)
def getFirstResult():Future[Int]= Future.firstCompletedOf(listOfFutures)

Trong trường hợp trên thì oneFuture sẽ trả về giá trị nhanh nhất

sequence

sequence là hơi ảo. Khi bạn có một List[Future[Int]] giống như List(oneFuture,twoFuture,threeFuture)và bạn mong muốn rằng tất cả giá trị trả lại sẽ là một List[Int] thay cho mỗi Int được đặt trong Future. sequence lấy List[Future[Int]] và biến đổi nó thành Future[List[Int]]

def getResultsAsList():Future[List[Int]] = Future.sequence(listOfFutures)

Scala-async library

Scala-async library là một project bên ngoài nhưng có thể được thêm vào dự án bằng cách thêm dependencyvào build.sbt

org.scala-lang.modules" %% "scala-async" % "0.9.6-RC2

async thư viện có 2 hàm hữu ích trong package scala.async.Async – asyncawait

async

Hàm async rất giống với hàm Future.apply. Trong thực tế việc sử dụng chúng rất giống nhau và chúng ta có thể thoải mái thay Future.apply bằng async bất cứ khi nào dùng

Future.apply

def apply[T](body: =>T)(implicit executor: ExecutionContext): Future[T]

async

def async[T](body: => T)(implicit execContext: ExecutionContext): Future[T]

Ưu điểm hàng đầu của việc sử dụng async thay cho Future.apply, ngoài khả năng dễ đọc ra là tạo ra mỗi Future khi sử dụng For-comprehense, Compiler sẽ đưa ra các anonymous class tách rời trong khi với async nó chỉ có một anonymous class duy nhất. Do đó, chúng ta có thể viết lại hàm oneFuture như sau:

val oneFuture: Future[Int] = async {
    Thread.sleep(1000)
    1
}

await

Hàm await nhận một Future và trả về kết quả. Nhưng không giống như Await.result đưa vào một Future và cũng trả về một kết quả ? Không, điểm khác biệt quan trọng là Await.result là blocking và không nên sử dụng trên Production code ngoại trừ testcode. Hàm await, được thực hiện bằng cách sử dụng macro Scala và hoàn thành là nó trả về kết quả của Future bằng cách sử dụng callback onComplete

Code

Nào chúng ta sẽ thử viết lại hàm sum-of-three-numbersdùng async và await

def sumOfThreeNumbersParallelWithAsyncAwait(): Future[Int] = async {
    await(oneFuture) + await(twoFuture) + await(threeFuture)
}

Testcase

it("could be composed using async/await") {
      val futureCombinators = new FutureCombinators
      val result = timed(Await.result(futureCombinators.sumOfThreeNumbersParallelWithAsyncAwait(), 4 seconds))
      result shouldBe 6
}

Như chúng ta đã thấy, code được viết theo cách này không chỉ là chạy bất đồng bộ mà  trông cũng tự nhiên. Chúng ta cũng thấy rõ rằng for-comprehense là bước nhảy vọt lớn từ việc sử dụng map, flatmap nhưng async/await lại tiến thêm một bước lớn nữa

 

 

 

You May Also Like

About the Author: Nguyen Dinh Thuc

Leave a Reply

Your email address will not be published.