Scala Future – Promises Phần 2

Trong bài viết trước chúng ta đã hiểu làm thế nào để lấy được các giá trị trả về từ Fututre với onCompleteonSuccess and onFailure. Chúng ta cũng hiểu cách sử dụng Await.result trong Test case để block và lấy giá trị từ Future. Bài viết này sẽ viết tóm tắt về liên hệ giữa Promise và Future

Promise

Khái niệm về Promise và Future là đi đôi với nhau. scala.concurrent.Promise là một bộ giá trị của Future. Nói một cách khác, Promise là bộ não đứng sau việc thực hiện các tính toán bất đồng bộ, Future chỉ là một xử lý cho việc đọc kết quả khi nó sẵn sàng trả về. Hiểu đơn giản thì Promise là setter(tạo)  còn Future là getter(lấy về).

Thông thường, chúng ta không cần tạo Promise một cách rõ ràng trong code. Tuy nhiên, chúng ta cần hiểu Promise là gì, mục đích là giúp hiểu rõ hơn Future hoạt động như thế nào.

1. Hoàn tất một Promise

Với đoạn code ví dụ bên dưới, chúng ta sẽ hiểu giá trị được tạo trong Promise như thế nào, mặt khác làm thể nào để đọc được nó

  1. Tạo một Promise
  2. Hoàn tất Promise bằng cách gán cho nó một giá trị thành công
  3. Sau đó trở lại đọc bên phía Promise – Future phản hồi lại bên gọi bằng cách sử dụng promise.future

Không mất nhiều thời gian để tiến trình chạy theo kịch bản bên trên. Giá trị được đặt cho Promise ngay lập tức và do đó giá trị cũng có hiệu lực ngay lập tức qua Future

Code

class PromiseInternals {  
...
 def aCompletedPromiseUsingSuccess(num:Int): Future[Int] = {
    val promise=Promise[Int]()
    promise.success(num)
    promise.future
  }
...
...

Test case

Khi chúng ta chạy test code, hàm onComplete được gọi lại để lấy giá trị trả về ngay lập tức, sau đó promise.success(100) được gọi

class PromiseInternalsTest extends FunSpec with Matchers {  
  describe("A Future") {
    it("gives out the correct value when a Promise is completed") {
      val promiseInternals = new PromiseInternals
      val aCompletedPromise=promiseInternals.aCompletedPromiseUsingSuccess(100)
      assertValue(aCompletedPromise, 100)
    }
...
...

def assertValueUsingOnComplete(future: Future[Int], expectedValue: Int): Unit = {
    future.onComplete {
      case Success(result) => {
        println (s"Result is $result and expectedValue is $expectedValue")
        result shouldBe expectedValue
      }
      case Failure (msg) => fail(msg)
    }
}

promise.success chỉ là một cách ngắn gọn để sử dụng promise.complete mà coi một Try[T] như một tham số, nên chúng ta có thể thực hiện viết một hàm bên dưới như sau:

def aCompletedPromiseUsingComplete(num:Int): Future[Int] = {
    val promise=Promise[Int]()
    promise.complete(Success(num))
    promise.future
}

Ngoài ra, nếu bạn muốn chỉ định một tính toán là thất bại chúng ta có thể hoặc sử dụng promise.complete(Failure(throwable)) hoặc

def aCompletedPromiseUsingFailure(num:Int): Future[Int] = {
    val promise=Promise[Int]()
    promise.failure(new RuntimeException("Evil Exception"))
    promise.future
}

Nào chúng ta có thể tóm tắt lại trong hình ảnh như dưới đây

2. Chạy một đoạn bất đồng bộ

Bây giờ, Chúng ta đã hiểu làm thế nào để hoàn tất Promise bằng cách đặt một giá trị thành công hoặc một exception, chúng ta sẽ cùng xem làm thế nào để thực hiện một đoạn code bất đồng bộ.

Trong test code sau đây, chúng ta sẽ truyền một đoạn code vào hàm someExternalDelayedCalculation thực hiện bất đồng bộ.

Test case

Chúng ta hãy cùng nhìn test case trước:

  1. Chúng ta sẽ truyền vào đoạn code như một tham số.  Đoạn code đơn giản là sleep 2 giây và trả về 100
  2. Assert giá trị sau 3 giây

Khá đơn giản như sau

it("gives out the correct value when an asynchronous block is submitted and is completed through a Promise") {
      val promiseInternals = new PromiseInternals
      val longCalculationFuture = promiseInternals.someExternalDelayedCalculation{()=>
        Thread.sleep(2000)
        100
      }
      println (s"We have submitted a block to be executed asynchronously ${longCalculationFuture.isCompleted}") //false at this point
      assertValue(longCalculationFuture, 100)
    }

  def assertValue(future: Future[Int], expectedValue: Int): Unit = {
    val resultVal=Await.result(future, 3000 seconds)
    resultVal shouldBe expectedValue
}

Code

  1. Tạo một FixedThreadPool để chạy code bất đồng bộ
  2. Tạo một Promise
  3. Tạo một Runable và đóng gói đoạn code chạy bất đồng bộ vào trong phương thức run
  4. Kết thúc Promise và hoàn tất Promise bằng cách sử dụng kết quả của hàm run
  5. Thực thi Runable trong somePool của threadpool
  6. Trả về promise.future  từ đó có thể đọc được giá trị
val somePool=Executors.newFixedThreadPool(2)

def someExternalDelayedCalculation(f:()=>Int): Future[Int] = {
    val promise=Promise[Int]()
    val thisIsWhereWeCallSomeExternalComputation = new Runnable {
      override def run(): Unit ={
        promise.complete{
          try(Success(f()))
          catch {
            case NonFatal (msg)=> Failure(msg)
          }
        }
      }
    }

    somePool.execute(thisIsWhereWeCallSomeExternalComputation)
    promise.future
}

3. Hàm future.apply() được thực hiện như thế nào

Trong bài viết trước, chúng ta đã nhìn thấy một đoạn code được truyền vào trong hàm apply() của Future, nó được thực thi bất đồng bộ

Bây giờ chúng ta so sánh code bên dưới trong hàm someExternalDelayedCalculation với cách thực hiện trong thực tế của hàm Future.apply()runable được đóng gói trong đấy

def apply[T](body: =>T)(implicit executor: ExecutionContext): scala.concurrent.Future[T] = {
    val runnable = new PromiseCompletingRunnable(body)
    executor.prepare.execute(runnable)
    runnable.promise.future
}

class PromiseCompletingRunnable[T](body: => T) extends Runnable {  
    val promise = new Promise.DefaultPromise[T]()

    override def run() = {
      promise complete {
        try Success(body) catch { case NonFatal(e) => Failure(e) }
      }
    }
}

Lặp lại giống các bước ở bên trên với hàm apply()

  1. Giữ ThreadPool mà chúng ta cung cấp như implicit ExecutionContext  (hàm ẩn)
  2. Tạo một Promise bằng cách tạo một PromiseCompletingRunnable là một Runable
  3. Đóng gói một đoạn code chạy bất đồng bộ trong phương thức run
  4. Kết thúc Promise và hoàn tất promise bằng cách sử dụng kết quả của hàm run
  5. Thực hiện Runable sử dụng ExecutionContext
  6. Trả về promise.future  từ đó có thể đọc được giá trị

4. Lần một đã viết, lần hai lỗi

Promise được hoàn tất với thành công hoặc thất bại, tất cả chúng ta có thể làm sau đó là lấy được giá trị của nó từ Future.  onComplete, hàm gọi lại của Future cũng được gọi. Các giá trị được đóng gói trong Future của Promise được đặt và không thể thay đổi được

Nếu chúng ta thử gán một giá trị mới cho Promise đã hoàn tất rồi thì một IllegalStateException sẽ xảy ra

Code

Nào chúng ta hãy cũng nhìn đoạn code ngắn bên dưới. Theo đó, chúng ta sẽ tạo một Promise và hoàn tất nó với giá trị là 100. Chúng ta sau đó sẽ thử hoàn tất nó với một Failure

def alreadyCompletedPromise(): Future[Int] = {
    val promise = Promise[Int]()
    promise.success(100) //completed
    promise.failure(new RuntimeException("Will never be set because an IllegalStateException will be thrown beforehand"))
    promise.future
  }

Testcase

Test case sẽ xác nhận rằng một IllegalStateException xảy ra khi thử một Promise với một Failure

it("should throw an error if a Promise is attempted to be completed more than once") {
      val promiseInternals = new PromiseInternals
      intercept[IllegalStateException] {
        promiseInternals.alreadyCompletedPromise()
      }
    }

 

You May Also Like

About the Author: Nguyen Dinh Thuc

Leave a Reply

Your email address will not be published.