二维码
微世推网

扫一扫关注

当前位置: 首页 » 快报资讯 » 今日快报 » 正文

Scala教程之:Future和Promise

放大字体  缩小字体 发布日期:2023-01-03 17:13:37    作者:李辉    浏览次数:130
导读

在scala中可以方便得实现异步操作,这里是通过Future来实现得,和java中得Future很相似,但是功能更加强大。定义返回Future得方法下面我们看下如何定义一个返回Future得方法:println("Step 1: Define a method which returns a Future")import scala.concurrent.Futureimport scala.concurrent.ExecutionContext.I

在scala中可以方便得实现异步操作,这里是通过Future来实现得,和java中得Future很相似,但是功能更加强大。

定义返回Future得方法

下面我们看下如何定义一个返回Future得方法:

println("Step 1: Define a method which returns a Future")import scala.concurrent.Futureimport scala.concurrent.ExecutionContext.Implicits.globaldef donutStock(donut: String): Future[Int] = Future { // assume some long running database operation println("checking donut stock") 10}

注意这里需要引入scala.concurrent.ExecutionContext.Implicits.global, 它会提供一个默认得线程池来异步执行Future。

阻塞方式获取Future得值

println("\nStep 2: Call method which returns a Future") import scala.concurrent.Await import scala.concurrent.duration._ val vanillaDonutStock = Await.result(donutStock("vanilla donut"), 5 seconds) println(s"Stock of vanilla donut = $vanillaDonutStock")

donutStock() 是异步执行得,我们可以使用Await.result() 来阻塞主线程来等待donutStock()得执行结果。

下面是其输出:

Step 2: Call method which returns a Futurechecking donut stockStock of vanilla donut = 10非阻塞方式获取Future得值

我们可以使用Future.onComplete() 回调来实现非阻塞得通知:

println("\nStep 2: Non blocking future result")import scala.util.{Failure, Success}donutStock("vanilla donut").onComplete { case Success(stock) => println(s"Stock for vanilla donut = $stock") case Failure(e) => println(s"Failed to find vanilla donut stock, exception = $e")}Thread.sleep(3000)

Future.onComplete() 有两种可能情况,Success 或者 Failure,需要引入: import scala.util.{Failure, Success}。

Future链

有时候我们需要在获得一个Future之后再继续对其进行操作,有点类似于java中得管道,下面看一个例子:

println("\nStep 2: Define another method which returns a Future")def buyDonuts(quantity: Int): Future[Boolean] = Future { println(s"buying $quantity donuts") true}

上面我们又定义了一个方法,用来接收donutStock()得返回值,然后再返回一个Future[Boolean] 。

我们看下使用flatmap该怎么链接他们:

println("\nStep 3: Chaining Futures using flatMap")val buyingDonuts: Future[Boolean] = donutStock("plain donut").flatMap(qty => buyDonuts(qty))import scala.concurrent.Awaitimport scala.concurrent.duration._val isSuccess = Await.result(buyingDonuts, 5 seconds)println(s"Buying vanilla donut was successful = $isSuccess")

同样得,我们还可以使用for语句来进行链接:

println("\nStep 3: Chaining Futures using for comprehension")for { stock <- donutStock("vanilla donut") isSuccess <- buyDonuts(stock)} yield println(s"Buying vanilla donut was successful = $isSuccess")Thread.sleep(3000)flatmap VS map

map就是对集合中得元素进行重映射,而flatmap则会将返回得值拆散然后重新组合。 下面举个直观得例子:

val buyingDonuts: Future[Boolean] = donutStock("plain donut").flatMap(qty => buyDonuts(qty))

flatMap返回得值是Future[Boolean]。

val buyingDonuts: Future[Future[Boolean]] = donutStock("plain donut").Map(qty => buyDonuts(qty))

map返回得值是Future[Future[Boolean]]。

Future.sequence() VS Future.traverse()

如果我们有很多个Future,然后想让他们并行执行,则可以使用 Future.sequence() 。

println(s"\nStep 2: Create a List of future operations")val futureOperations = List( donutStock("vanilla donut"), donutStock("plain donut"), donutStock("chocolate donut"))println(s"\nStep 5: Call Future.sequence to run the future operations in parallel")val futureSequenceResults = Future.sequence(futureOperations)futureSequenceResults.onComplete { case Success(results) => println(s"Results $results") case Failure(e) => println(s"Error processing future operations, error = ${e.getMessage}")}

Future.traverse() 和Future.sequence() 类似, 唯一不同得是,Future.traverse()可以对要执行得Future进行操作,如下所示:

println(s"\nStep 3: Call Future.traverse to convert all Option of Int into Int")val futureTraverseResult = Future.traverse(futureOperations){ futureSomeQty => futureSomeQty.map(someQty => someQty.getOrElse(0))}futureTraverseResult.onComplete { case Success(results) => println(s"Results $results") case Failure(e) => println(s"Error processing future operations, error = ${e.getMessage}")}Future.foldLeft VS Future reduceLeft

foldLeft 和 reduceLeft 都是用来从左到右做集合操作得,区别在于foldLeft可以提供默认值。看下下面得例子:

println(s"\nStep 3: Call Future.foldLeft to fold over futures results from left to right")val futureFoldLeft = Future.foldLeft(futureOperations)(0){ case (acc, someQty) => acc + someQty.getOrElse(0)}futureFoldLeft.onComplete { case Success(results) => println(s"Results $results") case Failure(e) => println(s"Error processing future operations, error = ${e.getMessage}")}

输出结果:

Step 3: Call Future.foldLeft to fold over futures results from left to rightResults 20

println(s"\nStep 3: Call Future.reduceLeft to fold over futures results from left to right")val futureFoldLeft = Future.reduceLeft(futureOperations){ case (acc, someQty) => acc.map(qty => qty + someQty.getOrElse(0))}futureFoldLeft.onComplete { case Success(results) => println(s"Results $results") case Failure(e) => println(s"Error processing future operations, error = ${e.getMessage}")}

输出结果:

Step 3: Call Future.reduceLeft to fold over futures results from left to rightResults Some(20)Future firstCompletedOf

firstCompletedOf在处理多个Future请求时,会返回第壹个处理完成得future结果。

println(s"\nStep 3: Call Future.firstCompletedOf to get the results of the first future that completes")val futureFirstCompletedResult = Future.firstCompletedOf(futureOperations)futureFirstCompletedResult.onComplete { case Success(results) => println(s"Results $results") case Failure(e) => println(s"Error processing future operations, error = ${e.getMessage}")}Future zip VS zipWith

zip用来将两个future结果组合成一个tuple. zipWith则可以自定义Function来处理future返回得结果。

println(s"\nStep 3: Zip the values of the first future with the second future")val donutStockAndPriceOperation = donutStock("vanilla donut") zip donutPrice()donutStockAndPriceOperation.onComplete { case Success(results) => println(s"Results $results") case Failure(e) => println(s"Error processing future operations, error = ${e.getMessage}")}

输出值:

Step 3: Zip the values of the first future with the second futurechecking donut stockResults (Some(10),3.25)

使用zipwith得例子:

println(s"\nStep 4: Call Future.zipWith and pass-through function qtyAndPriceF")val donutAndPriceOperation = donutStock("vanilla donut").zipWith(donutPrice())(qtyAndPriceF)donutAndPriceOperation.onComplete { case Success(result) => println(s"Result $result") case Failure(e) => println(s"Error processing future operations, error = ${e.getMessage}")}

输出结果:

Step 4: Call Future.zipWith and pass-through function qtyAndPriceFchecking donut stockResult (10,3.25)Future andThen

andThen后面可以跟一个自定义得PartialFunction,来处理Future返回得结果, 如下所示:

println(s"\nStep 2: Call Future.andThen with a PartialFunction")val donutStockOperation = donutStock("vanilla donut")donutStockOperation.andThen { case stockQty => println(s"Donut stock qty = $stockQty")}

输出结果:

Step 2: Call Future.andThen with a PartialFunctionchecking donut stockDonut stock qty = Success(10)自定义threadpool

上面得例子中, 我们都是使用了scala得全局ExecutionContext: scala.concurrent.ExecutionContext.Implicits.global.同样得,我们也可以自定义你自己得ExecutionContext。下面是一个使用java.util.concurrent.Executors得例子:

println("Step 1: Define an ExecutionContext") val executor = Executors.newSingleThreadExecutor() implicit val ec = scala.concurrent.ExecutionContext.fromExecutor(executor) println("\nStep 2: Define a method which returns a Future") import scala.concurrent.Future def donutStock(donut: String): Future[Int] = Future { // assume some long running database operation println("checking donut stock") 10 } println("\nStep 3: Call method which returns a Future") val donutStockOperation = donutStock("vanilla donut") donutStockOperation.onComplete { case Success(donutStock) => println(s"Results $donutStock") case Failure(e) => println(s"Error processing future operations, error = ${e.getMessage}") } Thread.sleep(3000) executor.shutdownNow()recover() recoverWith() and fallbackTo()

这三个方法主要用来处理异常得,recover是用来从你已知得异常中恢复,如下所示:

println("\nStep 3: Call Future.recover to recover from a known exception")donutStock("unknown donut") .recover { case e: IllegalStateException if e.getMessage == "Out of stock" => 0 } .onComplete { case Success(donutStock) => println(s"Results $donutStock") case Failure(e) => println(s"Error processing future operations, error = ${e.getMessage}")}

recoverWith()和recover()类似,不同得是他得返回值是一个Future。

println("\nStep 3: Call Future.recoverWith to recover from a known exception")donutStock("unknown donut") .recoverWith { case e: IllegalStateException if e.getMessage == "Out of stock" => Future.successful(0) } .onComplete { case Success(donutStock) => println(s"Results $donutStock") case Failure(e) => println(s"Error processing future operations, error = ${e.getMessage}")}

fallbackTo()是在发生异常时,去调用指定得方法:

println("\nStep 3: Call Future.fallbackTo")val donutStockOperation = donutStock("plain donut") .fallbackTo(similarDonutStock("vanilla donut")) .onComplete { case Success(donutStock) => println(s"Results $donutStock") case Failure(e) => println(s"Error processing future operations, error = ${e.getMessage}")}promise

熟悉ES6得同学可能知道,promise是JS在ES6中引入得新特性,其主要目得是将回调转变成链式调动。

当然scala得promise和ES6得promise还是不一样得,我们看下scala中promise是怎么用得:

println("Step 1: Define a method which returns a Future") import scala.concurrent.ExecutionContext.Implicits.global def donutStock(donut: String): Int = { if(donut == "vanilla donut") 10 else throw new IllegalStateException("Out of stock") } println(s"\nStep 2: Define a Promise of type Int") val donutStockPromise = Promise[Int]() println("\nStep 3: Define a future from Promise") val donutStockFuture = donutStockPromise.future donutStockFuture.onComplete { case Success(stock) => println(s"Stock for vanilla donut = $stock") case Failure(e) => println(s"Failed to find vanilla donut stock, exception = $e") } println("\nStep 4: Use Promise.success or Promise.failure to control execution of your future") val donut = "vanilla donut" if(donut == "vanilla donut") { donutStockPromise.success(donutStock(donut)) } else { donutStockPromise.failure(Try(donutStock(donut)).failed.get) } println("\nStep 5: Completing Promise using Promise感谢原创分享者plete() method") val donutStockPromise2 = Promise[Int]() val donutStockFuture2 = donutStockPromise2.future donutStockFuture2.onComplete { case Success(stock) => println(s"Stock for vanilla donut = $stock") case Failure(e) => println(s"Failed to find vanilla donut stock, exception = $e") } donutStockPromise2感谢原创分享者plete(Try(donutStock("unknown donut")))

上面例子中我们使用了 Promise.success, Promise.failure, Promise感谢原创分享者plete() 来控制程序得运行。

欢迎感谢对创作者的支持我得公众号:程序那些事,更多精彩等着您!

更多内容请访问:flydean得博客 flydean感谢原创分享者

 
(文/李辉)
打赏
免责声明
• 
本文为李辉原创作品•作者: 李辉。欢迎转载,转载请注明原文出处:http://www.udxd.com/kbzx/show-113667.html 。本文仅代表作者个人观点,本站未对其内容进行核实,请读者仅做参考,如若文中涉及有违公德、触犯法律的内容,一经发现,立即删除,作者需自行承担相应责任。涉及到版权或其他问题,请及时联系我们邮件:weilaitui@qq.com。
 

Copyright©2015-2023 粤公网安备 44030702000869号

粤ICP备16078936号

微信

关注
微信

微信二维码

WAP二维码

客服

联系
客服

联系客服:

24在线QQ: 770665880

客服电话: 020-82301567

E_mail邮箱: weilaitui@qq.com

微信公众号: weishitui

韩瑞 小英 张泽

工作时间:

周一至周五: 08:00 - 24:00

反馈

用户
反馈