def run(self): """Starts or resumes the generator, running until it reaches a yield point that is not ready. """ if self.running or self.finished: return try: self.running = True while True: future = self.future
# 當(dāng)前 future 沒有完成時(shí)直接返回,等待 IOLoop 在 future 完成后回調(diào)再執(zhí)行 if not future.done(): return
# Tornado 4.0 之前使用 YieldPoint 驅(qū)動,Callback 與 Wait/WaitAll # 協(xié)調(diào)時(shí),Callback 的回調(diào)結(jié)果需要 Runner 作為中轉(zhuǎn)站,通過 # Runner.register_callback(key) 登記 Callback ,再通過 # YieldPoint.result_callback(key) 取回“設(shè)置(回調(diào))方法”, # 外部通過“設(shè)置(回調(diào))方法”把結(jié)果保存到 Runner.results 字典中。 # Wait/WaitAll 通過 get_result(key) 取回 結(jié)果。 # YieldFuture 的實(shí)現(xiàn)也采用了相同的實(shí)現(xiàn)方式。 # Tornado 4.0 之后使用 Future 代替 YieldPoint,這些已經(jīng)過時(shí)。 # 與 Yield 相關(guān)的代碼都是為了向后兼容。 if self.pending_callbacks and not self.had_exception: # If we ran cleanly without waiting on all callbacks # raise an error (really more of a warning). If we # had an exception then some callbacks may have been # orphaned, so skip the check in that case. raise LeakedCallbackError( "finished without waiting for callbacks %r" % self.pending_callbacks) self.result_future.set_result(getattr(e, 'value', None)) self.result_future = None self._deactivate_stack_context() return except Exception: self.finished = True self.future = _null_future self.result_future.set_exc_info(sys.exc_info()) self.result_future = None self._deactivate_stack_context() return
# 繼續(xù)處理 yield 表達(dá)式結(jié)果 if not self.handle_yield(yielded): return finally: self.running = False
def handle_yield(self, yielded): # 為了保持向后兼容,需要對多個 YieldPonit 和 Future 的混合集合做處理。 # 對于全是 Future 的集合類型使用新的 multi_future 函數(shù)進(jìn)行封裝處理; # 不全是的使用 Multi 類進(jìn)行封裝,對于 Future 提供了 YieldFuture 適配器類。 # 詳細(xì)的實(shí)現(xiàn)細(xì)節(jié)見 YieldFuture、Multi的實(shí)現(xiàn)代碼。 # 若需要 run() 循環(huán)立即處理該 YieldPoint(被啟動)/Future(已經(jīng)完成) 則返 # 回 True,否則返回 False。 if isinstance(yielded, list): if all(is_future(f) for f in yielded): yielded = multi_future(yielded) else: yielded = Multi(yielded) elif isinstance(yielded, dict): if all(is_future(f) for f in yielded.values()): yielded = multi_future(yielded) else: yielded = Multi(yielded)
# 針對第一個 YieldPoint 使用一個 ExceptionStackContext 上下文來處理 # StackContexts 中沒有處理的異常,將未處理的異常記錄到 result_future 中。 # 對于 Future 對象則沒有必要, Future 提供了方法來記錄異常和異常堆棧信息, # 在 Future 完成后通過其 result() 方法獲取結(jié)果(在 run 方法的調(diào)用)時(shí)會 # 再次拋出異常,這時(shí)可捕獲記錄到 result_future 中。 if isinstance(yielded, YieldPoint): self.future = TracebackFuture() def start_yield_point(): try: yielded.start(self) # 如果 yielded 已經(jīng)完成,則將其結(jié)果賦值給 self.future,等待 run 循環(huán)處理; # 若未就緒,則需要通過 Runner.set_result(key, value) 來進(jìn)行賦值操作。 if yielded.is_ready(): self.future.set_result( yielded.get_result()) else: self.yield_point = yielded except Exception: self.future = TracebackFuture() self.future.set_exc_info(sys.exc_info()) if self.stack_context_deactivate is None: # Start a stack context if this is the first # YieldPoint we've seen. with stack_context.ExceptionStackContext( self.handle_exception) as deactivate: self.stack_context_deactivate = deactivate def cb(): start_yield_point() self.run() # 第 1 個 yielded 交由 IOLoop來啟動 self.io_loop.add_callback(cb) return False else: # 啟動 YieldPoint,需要返回 True,在 run 循環(huán)中繼續(xù)處理 start_yield_point() elif is_future(yielded): self.future = yielded # self.future 完成后繼續(xù) self.run() # moment = Future() 是一個特殊的對象,主要用在需要長時(shí)間執(zhí)行的 coroutine 中, # 通過 “yield gen.moment” 中斷當(dāng)前 coroutine ,將控制權(quán)交給 IOLoop 去輪詢。 # 等效于當(dāng)前 coroutine 臨時(shí)放棄時(shí)間片,給了其他 callback 機(jī)會運(yùn)行。 if not self.future.done() or self.future is moment: self.io_loop.add_future( self.future, lambda f: self.run()) return False else: self.future = TracebackFuture() self.future.set_exception(BadYieldError( "yielded unknown object %r" % (yielded,))) return True
def __init__(self, gen, result_future, first_yielded): self.gen = genreturn_futurereturn_future self.result_future = result_future self.future = _null_future self.yield_point = None self.pending_callbacks = None self.results = None self.running = False self.finished = False self.had_exception = False self.io_loop = IOLoop.current() # For efficiency, we do not create a stack context until we # reach a YieldPoint (stack contexts are required for the historical # semantics of YieldPoints, but not for Futures). When we have # done so, this field will be set and must be called at the end # of the coroutine. self.stack_context_deactivate = None if self.handle_yield(first_yielded): self.run()
def _argument_adapter(callback): """Returns a function that when invoked runs ``callback`` with one arg.
If the function returned by this function is called with exactly one argument, that argument is passed to ``callback``. Otherwise the args tuple and kwargs dict are wrapped in an `Arguments` object. """ def wrapper(*args, **kwargs): if kwargs or len(args) > 1: callback(Arguments(args, kwargs)) elif args: callback(args[0]) else: callback(None) return wrapper