여러 선물을 기다리는 방법
내가 여러 개의 미래를 가지고 있고 그중 하나 가 실패 하거나 모두 성공할 때까지 기다려야한다고 가정 해보자 .
예를 들면 다음과 같습니다하자 3 개 선물이있다 : f1
, f2
, f3
.
경우
f1
성공 및f2
실패 나는 기다리지 않는다f3
(반환 실패를 클라이언트로).경우
f2
동안 실패f1
하고f3
아직 가동 나는 그들 (그리고 반환을 기다리지 않는다 실패 )경우
f1
성공 후f2
성공 나는 기다리고 계속f3
.
어떻게 구현 하시겠습니까?
대신 다음과 같이 이해를 위해 사용할 수 있습니다.
val fut1 = Future{...}
val fut2 = Future{...}
val fut3 = Future{...}
val aggFut = for{
f1Result <- fut1
f2Result <- fut2
f3Result <- fut3
} yield (f1Result, f2Result, f3Result)
이 예에서 선물 1, 2, 3은 병렬로 시작됩니다. 그런 다음 for comprehension에서 결과 1, 2, 3이 나올 때까지 기다립니다. 1 또는 2가 실패하면 더 이상 3을 기다리지 않습니다. 3 개가 모두 성공하면 aggFut
val은 3 개의 Future의 결과에 따라 3 개의 슬롯이있는 튜플을 보유합니다.
이제 fut2가 먼저 실패한다고 말하면 기다리지 않으려는 동작이 필요하면 상황이 조금 더 까다로워집니다. 위의 예에서 fut2가 실패했음을 인식하기 전에 fut1이 완료 될 때까지 기다려야합니다. 이를 해결하기 위해 다음과 같이 시도 할 수 있습니다.
val fut1 = Future{Thread.sleep(3000);1}
val fut2 = Promise.failed(new RuntimeException("boo")).future
val fut3 = Future{Thread.sleep(1000);3}
def processFutures(futures:Map[Int,Future[Int]], values:List[Any], prom:Promise[List[Any]]):Future[List[Any]] = {
val fut = if (futures.size == 1) futures.head._2
else Future.firstCompletedOf(futures.values)
fut onComplete{
case Success(value) if (futures.size == 1)=>
prom.success(value :: values)
case Success(value) =>
processFutures(futures - value, value :: values, prom)
case Failure(ex) => prom.failure(ex)
}
prom.future
}
val aggFut = processFutures(Map(1 -> fut1, 2 -> fut2, 3 -> fut3), List(), Promise[List[Any]]())
aggFut onComplete{
case value => println(value)
}
이제 이것은 올바르게 작동하지만 문제는 성공적으로 완료되었을 때 Future
제거 할 항목을 아는 데서 발생 Map
합니다. 결과를 해당 결과를 생성 한 Future와 적절하게 연관시킬 수있는 방법이있는 한, 이와 같은 것이 작동합니다. 그것은 단지 재귀 적으로지도에서 완성 된 Futures를 제거 Future.firstCompletedOf
하고 남은 Futures
것이 없을 때까지 나머지 를 호출 하여 결과를 수집합니다. 예쁘지는 않지만 이야기하는 행동이 정말로 필요하다면 이것 또는 비슷한 것이 작동 할 수 있습니다.
약속을 사용하고 첫 번째 실패 또는 마지막으로 완료된 집계 성공을 보낼 수 있습니다.
def sequenceOrBailOut[A, M[_] <: TraversableOnce[_]](in: M[Future[A]] with TraversableOnce[Future[A]])(implicit cbf: CanBuildFrom[M[Future[A]], A, M[A]], executor: ExecutionContext): Future[M[A]] = {
val p = Promise[M[A]]()
// the first Future to fail completes the promise
in.foreach(_.onFailure{case i => p.tryFailure(i)})
// if the whole sequence succeeds (i.e. no failures)
// then the promise is completed with the aggregated success
Future.sequence(in).foreach(p trySuccess _)
p.future
}
그런 다음 차단하려는 경우 Await
그 결과 Future
를 얻거나 map
다른 것으로 할 수 있습니다.
for comprehension과의 차이점은 여기서 첫 번째 오류가 실패하는 반면, comprehension을 사용하면 입력 컬렉션의 순회 순서에서 첫 번째 오류가 발생합니다 (다른 것이 먼저 실패하더라도). 예를 들면 :
val f1 = Future { Thread.sleep(1000) ; 5 / 0 }
val f2 = Future { 5 }
val f3 = Future { None.get }
Future.sequence(List(f1,f2,f3)).onFailure{case i => println(i)}
// this waits one second, then prints "java.lang.ArithmeticException: / by zero"
// the first to fail in traversal order
과:
val f1 = Future { Thread.sleep(1000) ; 5 / 0 }
val f2 = Future { 5 }
val f3 = Future { None.get }
sequenceOrBailOut(List(f1,f2,f3)).onFailure{case i => println(i)}
// this immediately prints "java.util.NoSuchElementException: None.get"
// the 'actual' first to fail (usually...)
// and it returns early (it does not wait 1 sec)
액터를 사용하지 않는 솔루션입니다.
import scala.util._
import scala.concurrent._
import java.util.concurrent.atomic.AtomicInteger
// Nondeterministic.
// If any failure, return it immediately, else return the final success.
def allSucceed[T](fs: Future[T]*): Future[T] = {
val remaining = new AtomicInteger(fs.length)
val p = promise[T]
fs foreach {
_ onComplete {
case s @ Success(_) => {
if (remaining.decrementAndGet() == 0) {
// Arbitrarily return the final success
p tryComplete s
}
}
case f @ Failure(_) => {
p tryComplete f
}
}
}
p.future
}
선물만으로도 할 수 있습니다. 다음은 한 가지 구현입니다. 실행이 일찍 종료되지는 않습니다! 이 경우 좀 더 정교한 작업을 수행해야합니다 (그리고 직접 중단을 구현할 수도 있습니다). 그러나 작동하지 않을 일을 계속 기다리고 싶지 않다면, 핵심은 첫 번째 일이 끝날 때까지 계속 기다렸다가 아무것도 남지 않거나 예외가 발생하면 중지하는 것입니다.
import scala.annotation.tailrec
import scala.util.{Try, Success, Failure}
import scala.concurrent._
import scala.concurrent.duration.Duration
import ExecutionContext.Implicits.global
@tailrec def awaitSuccess[A](fs: Seq[Future[A]], done: Seq[A] = Seq()):
Either[Throwable, Seq[A]] = {
val first = Future.firstCompletedOf(fs)
Await.ready(first, Duration.Inf).value match {
case None => awaitSuccess(fs, done) // Shouldn't happen!
case Some(Failure(e)) => Left(e)
case Some(Success(_)) =>
val (complete, running) = fs.partition(_.isCompleted)
val answers = complete.flatMap(_.value)
answers.find(_.isFailure) match {
case Some(Failure(e)) => Left(e)
case _ =>
if (running.length > 0) awaitSuccess(running, answers.map(_.get) ++: done)
else Right( answers.map(_.get) ++: done )
}
}
}
다음은 모든 것이 정상적으로 작동 할 때 작동하는 예입니다.
scala> awaitSuccess(Seq(Future{ println("Hi!") },
Future{ Thread.sleep(1000); println("Fancy meeting you here!") },
Future{ Thread.sleep(2000); println("Bye!") }
))
Hi!
Fancy meeting you here!
Bye!
res1: Either[Throwable,Seq[Unit]] = Right(List((), (), ()))
그러나 무언가 잘못되면 :
scala> awaitSuccess(Seq(Future{ println("Hi!") },
Future{ Thread.sleep(1000); throw new Exception("boo"); () },
Future{ Thread.sleep(2000); println("Bye!") }
))
Hi!
res2: Either[Throwable,Seq[Unit]] = Left(java.lang.Exception: boo)
scala> Bye!
For this purpose I would use an Akka actor. Unlike the for-comprehension, it fails as soon as any of the futures fail, so it's a bit more efficient in that sense.
class ResultCombiner(futs: Future[_]*) extends Actor {
var origSender: ActorRef = null
var futsRemaining: Set[Future[_]] = futs.toSet
override def receive = {
case () =>
origSender = sender
for(f <- futs)
f.onComplete(result => self ! if(result.isSuccess) f else false)
case false =>
origSender ! SomethingFailed
case f: Future[_] =>
futsRemaining -= f
if(futsRemaining.isEmpty) origSender ! EverythingSucceeded
}
}
sealed trait Result
case object SomethingFailed extends Result
case object EverythingSucceeded extends Result
Then, create the actor, send a message to it (so that it will know where to send its reply to) and wait for a reply.
val actor = actorSystem.actorOf(Props(new ResultCombiner(f1, f2, f3)))
try {
val f4: Future[Result] = actor ? ()
implicit val timeout = new Timeout(30 seconds) // or whatever
Await.result(f4, timeout.duration).asInstanceOf[Result] match {
case SomethingFailed => println("Oh noes!")
case EverythingSucceeded => println("It all worked!")
}
} finally {
// Avoid memory leaks: destroy the actor
actor ! PoisonPill
}
This question has been answered but I am posting my value class solution (value classes were added in 2.10) since there isn't one here. Please feel free to criticize.
implicit class Sugar_PimpMyFuture[T](val self: Future[T]) extends AnyVal {
def concurrently = ConcurrentFuture(self)
}
case class ConcurrentFuture[A](future: Future[A]) extends AnyVal {
def map[B](f: Future[A] => Future[B]) : ConcurrentFuture[B] = ConcurrentFuture(f(future))
def flatMap[B](f: Future[A] => ConcurrentFuture[B]) : ConcurrentFuture[B] = concurrentFutureFlatMap(this, f) // work around no nested class in value class
}
def concurrentFutureFlatMap[A,B](outer: ConcurrentFuture[A], f: Future[A] => ConcurrentFuture[B]) : ConcurrentFuture[B] = {
val p = Promise[B]()
val inner = f(outer.future)
inner.future onFailure { case t => p.tryFailure(t) }
outer.future onFailure { case t => p.tryFailure(t) }
inner.future onSuccess { case b => p.trySuccess(b) }
ConcurrentFuture(p.future)
}
ConcurrentFuture is a no overhead Future wrapper that changes the default Future map/flatMap from do-this-then-that to combine-all-and-fail-if-any-fail. Usage:
def func1 : Future[Int] = Future { println("f1!");throw new RuntimeException; 1 }
def func2 : Future[String] = Future { Thread.sleep(2000);println("f2!");"f2" }
def func3 : Future[Double] = Future { Thread.sleep(2000);println("f3!");42.0 }
val f : Future[(Int,String,Double)] = {
for {
f1 <- func1.concurrently
f2 <- func2.concurrently
f3 <- func3.concurrently
} yield for {
v1 <- f1
v2 <- f2
v3 <- f3
} yield (v1,v2,v3)
}.future
f.onFailure { case t => println("future failed $t") }
In the example above, f1,f2 and f3 will run concurrently and if any fail in any order the future of the tuple will fail immediately.
You might want to checkout Twitter's Future API. Notably the Future.collect method. It does exactly what you want: https://twitter.github.io/scala_school/finagle.html
The source code Future.scala is available here: https://github.com/twitter/util/blob/master/util-core/src/main/scala/com/twitter/util/Future.scala
You can use this:
val l = List(1, 6, 8)
val f = l.map{
i => future {
println("future " +i)
Thread.sleep(i* 1000)
if (i == 12)
throw new Exception("6 is not legal.")
i
}
}
val f1 = Future.sequence(f)
f1 onSuccess{
case l => {
logInfo("onSuccess")
l.foreach(i => {
logInfo("h : " + i)
})
}
}
f1 onFailure{
case l => {
logInfo("onFailure")
}
참고URL : https://stackoverflow.com/questions/16256279/how-to-wait-for-several-futures
'developer tip' 카테고리의 다른 글
CMake에서 문자열을 여러 줄로 분할하는 방법은 무엇입니까? (0) | 2020.09.25 |
---|---|
Google Play에서 언어를 변경하려면 어떻게하나요? (0) | 2020.09.25 |
이 Java 8 람다가 컴파일에 실패하는 이유는 무엇입니까? (0) | 2020.09.24 |
WPF에서 GridViewColumn 데이터를 자동 크기 조정하고 오른쪽 정렬하는 방법은 무엇입니까? (0) | 2020.09.24 |
Xcode-모든 파일에서 텍스트 검색 (0) | 2020.09.24 |