U
    lufC                     @   s   d dl Z d dlZd dlZd dlmZmZ d dlmZmZ d dl	m
Z
mZmZmZmZ d dl	Z	e	jrrd dl	mZmZ dddd	d
gZG dd deZG dd deZG dd deZG dd deZG dd deZG dd	 d	eZG dd
 d
eZdS )    N)genioloop)Future"future_set_result_unless_cancelled)UnionOptionalTypeAny	Awaitable)DequeSet	ConditionEvent	SemaphoreBoundedSemaphoreLockc                   @   s,   e Zd ZdZddddZddddZdS )_TimeoutGarbageCollectorzBase class for objects that periodically clean up timed-out waiters.

    Avoids memory leak in a common pattern like:

        while True:
            yield condition.wait(short_timeout)
            print('looping....')
    Nreturnc                 C   s   t  | _d| _d S )Nr   )collectionsdeque_waiters	_timeoutsself r   1/tmp/pip-unpacked-wheel-bmg6zs32/tornado/locks.py__init__)   s    
z!_TimeoutGarbageCollector.__init__c                 C   s:   |  j d7  _ | j dkr6d| _ tdd | jD | _d S )N   d   r   c                 s   s   | ]}|  s|V  qd S N)done).0wr   r   r   	<genexpr>2   s      z<_TimeoutGarbageCollector._garbage_collect.<locals>.<genexpr>)r   r   r   r   r   r   r   r   _garbage_collect-   s    
z)_TimeoutGarbageCollector._garbage_collect)__name__
__module____qualname____doc__r   r%   r   r   r   r   r      s   	r   c                   @   sb   e Zd ZdZedddZdeeee	j
f  ee dddZdedd
ddZddddZdS )r   a  A condition allows one or more coroutines to wait until notified.

    Like a standard `threading.Condition`, but does not need an underlying lock
    that is acquired and released.

    With a `Condition`, coroutines can wait to be notified by other coroutines:

    .. testcode::

        import asyncio
        from tornado import gen
        from tornado.locks import Condition

        condition = Condition()

        async def waiter():
            print("I'll wait right here")
            await condition.wait()
            print("I'm done waiting")

        async def notifier():
            print("About to notify")
            condition.notify()
            print("Done notifying")

        async def runner():
            # Wait for waiter() and notifier() in parallel
            await gen.multi([waiter(), notifier()])

        asyncio.run(runner())

    .. testoutput::

        I'll wait right here
        About to notify
        Done notifying
        I'm done waiting

    `wait` takes an optional ``timeout`` argument, which is either an absolute
    timestamp::

        io_loop = IOLoop.current()

        # Wait up to 1 second for a notification.
        await condition.wait(timeout=io_loop.time() + 1)

    ...or a `datetime.timedelta` for a timeout relative to the current time::

        # Wait up to 1 second.
        await condition.wait(timeout=datetime.timedelta(seconds=1))

    The method returns False if there's no notification before the deadline.

    .. versionchanged:: 5.0
       Previously, waiters could be notified synchronously from within
       `notify`. Now, the notification will always be received on the
       next iteration of the `.IOLoop`.
    r   c                 C   s.   d| j jf }| jr&|dt| j 7 }|d S )Nz<%sz waiters[%s]>)	__class__r&   r   len)r   resultr   r   r   __repr__q   s    zCondition.__repr__Ntimeoutr   c                    sX   t  j |rTddfdd}tj   || fdd S )zWait for `.notify`.

        Returns a `.Future` that resolves ``True`` if the condition is notified,
        or ``False`` after a timeout.
        Nr   c                      s     std    d S NF)r!   r   r%   r   r   waiterr   r   
on_timeout   s    
z"Condition.wait.<locals>.on_timeoutc                    s
     S r    Zremove_timeout_io_looptimeout_handler   r   <lambda>       z Condition.wait.<locals>.<lambda>)r   r   appendr   IOLoopcurrentadd_timeoutadd_done_callbackr   r0   r4   r   r9   r   r:   r3   r   waitw   s    
zCondition.waitr   )nr   c                 C   sL   g }|r4| j r4| j  }| s|d8 }|| q|D ]}t|d q8dS )zWake ``n`` waiters.r   TN)r   popleftr!   r=   r   )r   rE   waitersr3   r   r   r   notify   s    

zCondition.notifyc                 C   s   |  t| j dS )zWake all waiters.N)rH   r,   r   r   r   r   r   
notify_all   s    zCondition.notify_all)N)r   )r&   r'   r(   r)   strr.   r   r   floatdatetime	timedeltar
   boolrD   intrH   rI   r   r   r   r   r   5   s   ; c                   @   sz   e Zd ZdZddddZedddZeddd	Zddd
dZ	ddddZ
deeeejf  ed dddZdS )r   a  An event blocks coroutines until its internal flag is set to True.

    Similar to `threading.Event`.

    A coroutine can wait for an event to be set. Once it is set, calls to
    ``yield event.wait()`` will not block unless the event has been cleared:

    .. testcode::

        import asyncio
        from tornado import gen
        from tornado.locks import Event

        event = Event()

        async def waiter():
            print("Waiting for event")
            await event.wait()
            print("Not waiting this time")
            await event.wait()
            print("Done")

        async def setter():
            print("About to set the event")
            event.set()

        async def runner():
            await gen.multi([waiter(), setter()])

        asyncio.run(runner())

    .. testoutput::

        Waiting for event
        About to set the event
        Not waiting this time
        Done
    Nr   c                 C   s   d| _ t | _d S r1   )_valuesetr   r   r   r   r   r      s    zEvent.__init__c                 C   s   d| j j|  rdndf S )Nz<%s %s>rQ   clear)r+   r&   is_setr   r   r   r   r.      s    zEvent.__repr__c                 C   s   | j S )z-Return ``True`` if the internal flag is true.rP   r   r   r   r   rS      s    zEvent.is_setc                 C   s.   | j s*d| _ | jD ]}| s|d qdS )zSet the internal flag to ``True``. All waiters are awakened.

        Calling `.wait` once the flag is set will not block.
        TN)rP   r   r!   
set_result)r   futr   r   r   rQ      s
    
z	Event.setc                 C   s
   d| _ dS )zkReset the internal flag to ``False``.

        Calls to `.wait` will block until `.set` is called.
        FNrT   r   r   r   r   rR      s    zEvent.clearr/   c                    sj   t   jr d  S j   fdd |dkrD S t| }| fdd |S dS )zBlock until the internal flag is true.

        Returns an awaitable, which raises `tornado.util.TimeoutError` after a
        timeout.
        Nc                    s    j | S r    )r   removerV   r   r   r   r;      r<   zEvent.wait.<locals>.<lambda>c                    s      s  S d S r    )r!   cancel)tfrX   r   r   r;      r<   )r   rP   rU   r   addrA   r   Zwith_timeout)r   r0   Ztimeout_futr   )rV   r   r   rD      s    

z
Event.wait)N)r&   r'   r(   r)   r   rJ   r.   rN   rS   rQ   rR   r   r   rK   rL   rM   r
   rD   r   r   r   r   r      s   ' c                   @   sL   e Zd ZdZeddddZddddZd	ee ee	j
 dd
ddZdS )_ReleasingContextManagerzReleases a Lock or Semaphore at the end of a "with" statement.

    with (yield semaphore.acquire()):
        pass

    # Now semaphore.release() has been called.
    N)objr   c                 C   s
   || _ d S r    )_obj)r   r]   r   r   r   r     s    z!_ReleasingContextManager.__init__r   c                 C   s   d S r    r   r   r   r   r   	__enter__  s    z"_ReleasingContextManager.__enter__Optional[Type[BaseException]])exc_typeexc_valexc_tbr   c                 C   s   | j   d S r    )r^   release)r   ra   rb   rc   r   r   r   __exit__  s    z!_ReleasingContextManager.__exit__)r&   r'   r(   r)   r	   r   r_   r   BaseExceptiontypesTracebackTypere   r   r   r   r   r\     s   r\   c                       s   e Zd ZdZdedd fddZed fdd	Zddd
dZde	e
eejf  ee dddZddddZde	e e	ej ddddZddddZde	e e	ej ddddZ  ZS )r   a  A lock that can be acquired a fixed number of times before blocking.

    A Semaphore manages a counter representing the number of `.release` calls
    minus the number of `.acquire` calls, plus an initial value. The `.acquire`
    method blocks if necessary until it can return without making the counter
    negative.

    Semaphores limit access to a shared resource. To allow access for two
    workers at a time:

    .. testsetup:: semaphore

       from collections import deque

       from tornado import gen
       from tornado.ioloop import IOLoop
       from tornado.concurrent import Future

       inited = False

       async def simulator(futures):
           for f in futures:
               # simulate the asynchronous passage of time
               await gen.sleep(0)
               await gen.sleep(0)
               f.set_result(None)

       def use_some_resource():
           global inited
           global futures_q
           if not inited:
               inited = True
               # Ensure reliable doctest output: resolve Futures one at a time.
               futures_q = deque([Future() for _ in range(3)])
               IOLoop.current().add_callback(simulator, list(futures_q))

           return futures_q.popleft()

    .. testcode:: semaphore

        import asyncio
        from tornado import gen
        from tornado.locks import Semaphore

        sem = Semaphore(2)

        async def worker(worker_id):
            await sem.acquire()
            try:
                print("Worker %d is working" % worker_id)
                await use_some_resource()
            finally:
                print("Worker %d is done" % worker_id)
                sem.release()

        async def runner():
            # Join all workers.
            await gen.multi([worker(i) for i in range(3)])

        asyncio.run(runner())

    .. testoutput:: semaphore

        Worker 0 is working
        Worker 1 is working
        Worker 0 is done
        Worker 2 is working
        Worker 1 is done
        Worker 2 is done

    Workers 0 and 1 are allowed to run concurrently, but worker 2 waits until
    the semaphore has been released once, by worker 0.

    The semaphore can be used as an async context manager::

        async def worker(worker_id):
            async with sem:
                print("Worker %d is working" % worker_id)
                await use_some_resource()

            # Now the semaphore has been released.
            print("Worker %d is done" % worker_id)

    For compatibility with older versions of Python, `.acquire` is a
    context manager, so ``worker`` could also be written as::

        @gen.coroutine
        def worker(worker_id):
            with (yield sem.acquire()):
                print("Worker %d is working" % worker_id)
                yield use_some_resource()

            # Now the semaphore has been released.
            print("Worker %d is done" % worker_id)

    .. versionchanged:: 4.3
       Added ``async with`` support in Python 3.5.

    r   Nvaluer   c                    s$   t    |dk rtd|| _d S )Nr   z$semaphore initial value must be >= 0)superr   
ValueErrorrP   r   rj   r+   r   r   r     s    
zSemaphore.__init__r   c                    sP   t   }| jdkrdn
d| j}| jr<d|t| j}d|dd |S )Nr   lockedzunlocked,value:{0}z{0},waiters:{1}z<{0} [{1}]>r   )rk   r.   rP   formatr   r,   )r   resextrarn   r   r   r.     s    
zSemaphore.__repr__c                 C   sJ   |  j d7  _ | jrF| j }| s|  j d8  _ |t|  qFqdS )*Increment the counter and wake one waiter.r   N)rP   r   rF   r!   rU   r\   r2   r   r   r   rd     s    
zSemaphore.releaser/   c                    s   t  jdkr. jd8  _t nNj |r|ddfdd}tj   	||
 fdd S )	zDecrement the counter. Returns an awaitable.

        Block if the counter is zero and wait for a `.release`. The awaitable
        raises `.TimeoutError` after the deadline.
        r   r   Nr   c                      s"     st     d S r    )r!   Zset_exceptionr   TimeoutErrorr%   r   r2   r   r   r4     s    z%Semaphore.acquire.<locals>.on_timeoutc                    s
     S r    r5   r6   r8   r   r   r;     r<   z#Semaphore.acquire.<locals>.<lambda>)r   rP   rU   r\   r   r=   r   r>   r?   r@   rA   rB   r   rC   r   acquire  s    

zSemaphore.acquirec                 C   s   t dd S )Nz0Use 'async with' instead of 'with' for SemaphoreRuntimeErrorr   r   r   r   r_     s    zSemaphore.__enter__r`   )typrj   	tracebackr   c                 C   s   |    d S r    r_   )r   ry   rj   rz   r   r   r   re     s    zSemaphore.__exit__c                    s   |   I d H  d S r    rv   r   r   r   r   
__aenter__  s    zSemaphore.__aenter__ry   rj   tbr   c                    s   |    d S r    rd   r   ry   rj   r   r   r   r   	__aexit__  s    zSemaphore.__aexit__)r   )N)r&   r'   r(   r)   rO   r   rJ   r.   rd   r   r   rK   rL   rM   r
   r\   rv   r_   rf   rg   rh   re   r}   r   __classcell__r   r   rn   r   r     s*   d	 c                       s<   e Zd ZdZd
edd fddZdd fdd	Z  ZS )r   a:  A semaphore that prevents release() being called too many times.

    If `.release` would increment the semaphore's value past the initial
    value, it raises `ValueError`. Semaphores are mostly used to guard
    resources with limited capacity, so a semaphore released too many times
    is a sign of a bug.
    r   Nri   c                    s   t  j|d || _d S )Nrj   )rk   r   _initial_valuerm   rn   r   r   r     s    zBoundedSemaphore.__init__r   c                    s"   | j | jkrtdt   dS )rt   z!Semaphore released too many timesN)rP   r   rl   rk   rd   r   rn   r   r   rd     s    zBoundedSemaphore.release)r   )r&   r'   r(   r)   rO   r   rd   r   r   r   rn   r   r     s   c                   @   s   e Zd ZdZddddZedddZdeee	e
jf  ee dd	d
ZddddZddddZdee eej ddddZddddZdee eej ddddZdS )r   a  A lock for coroutines.

    A Lock begins unlocked, and `acquire` locks it immediately. While it is
    locked, a coroutine that yields `acquire` waits until another coroutine
    calls `release`.

    Releasing an unlocked lock raises `RuntimeError`.

    A Lock can be used as an async context manager with the ``async
    with`` statement:

    >>> from tornado import locks
    >>> lock = locks.Lock()
    >>>
    >>> async def f():
    ...    async with lock:
    ...        # Do something holding the lock.
    ...        pass
    ...
    ...    # Now the lock is released.

    For compatibility with older versions of Python, the `.acquire`
    method asynchronously returns a regular context manager:

    >>> async def f2():
    ...    with (yield lock.acquire()):
    ...        # Do something holding the lock.
    ...        pass
    ...
    ...    # Now the lock is released.

    .. versionchanged:: 4.3
       Added ``async with`` support in Python 3.5.

    Nr   c                 C   s   t dd| _d S )Nr   r   )r   _blockr   r   r   r   r     s    zLock.__init__c                 C   s   d| j j| jf S )Nz<%s _block=%s>)r+   r&   r   r   r   r   r   r.     s    zLock.__repr__r/   c                 C   s   | j |S )zAttempt to lock. Returns an awaitable.

        Returns an awaitable, which raises `tornado.util.TimeoutError` after a
        timeout.
        )r   rv   )r   r0   r   r   r   rv     s    zLock.acquirec                 C   s0   z| j   W n tk
r*   tdY nX dS )zUnlock.

        The first coroutine in line waiting for `acquire` gets the lock.

        If not locked, raise a `RuntimeError`.
        zrelease unlocked lockN)r   rd   rl   rx   r   r   r   r   rd     s    zLock.releasec                 C   s   t dd S )Nz+Use `async with` instead of `with` for Lockrw   r   r   r   r   r_   (  s    zLock.__enter__r`   r~   c                 C   s   |    d S r    r{   r   r   r   r   re   +  s    zLock.__exit__c                    s   |   I d H  d S r    r|   r   r   r   r   r}   3  s    zLock.__aenter__c                    s   |    d S r    r   r   r   r   r   r   6  s    zLock.__aexit__)N)r&   r'   r(   r)   r   rJ   r.   r   r   rK   rL   rM   r
   r\   rv   rd   r_   rf   rg   rh   re   r}   r   r   r   r   r   r     s*   $ 
)r   rL   rg   Ztornador   r   Ztornado.concurrentr   r   typingr   r   r   r	   r
   TYPE_CHECKINGr   r   __all__objectr   r   r   r\   r   r   r   r   r   r   r   <module>   s"   id :