U
    luf0                     @   s&  d Z ddlZddlZddlZddlmZmZ ddlmZm	Z	 ddl
mZ ddlmZmZmZmZmZ ddlZejrddlmZmZmZ edZd	d
dddgZG dd deZG dd deZeedeejf ddddZG dd dee ZG dd	 d	ee ZG dd
 d
eZ G dd deZ!dS )a  Asynchronous queues for coroutines. These classes are very similar
to those provided in the standard library's `asyncio package
<https://docs.python.org/3/library/asyncio-queue.html>`_.

.. warning::

   Unlike the standard library's `queue` module, the classes defined here
   are *not* thread-safe. To use these queues from another thread,
   use `.IOLoop.add_callback` to transfer control to the `.IOLoop` thread
   before calling any queue methods.

    N)genioloop)Future"future_set_result_unless_cancelled)Event)UnionTypeVarGeneric	AwaitableOptional)DequeTupleAny_TQueuePriorityQueue	LifoQueue	QueueFull
QueueEmptyc                   @   s   e Zd ZdZdS )r   z:Raised by `.Queue.get_nowait` when the queue has no items.N__name__
__module____qualname____doc__ r   r   2/tmp/pip-unpacked-wheel-bmg6zs32/tornado/queues.pyr   /   s   c                   @   s   e Zd ZdZdS )r   zBRaised by `.Queue.put_nowait` when a queue is at its maximum size.Nr   r   r   r   r   r   5   s   )futuretimeoutreturnc                    sD   |r@d d fdd}t j || fdd d S )Nr   c                      s      s t  d S N)doneZset_exceptionr   TimeoutErrorr   )r   r   r   
on_timeout@   s    z _set_timeout.<locals>.on_timeoutc                    s
     S r    )Zremove_timeout)_)io_looptimeout_handler   r   <lambda>F       z_set_timeout.<locals>.<lambda>)r   ZIOLoopcurrentZadd_timeoutZadd_done_callback)r   r   r#   r   )r   r%   r&   r   _set_timeout;   s
    
r*   c                   @   s.   e Zd ZdddddZee dddZdS )	_QueueIteratorz	Queue[_T]N)qr   c                 C   s
   || _ d S r    )r,   )selfr,   r   r   r   __init__J   s    z_QueueIterator.__init__r   c                 C   s
   | j  S r    )r,   getr-   r   r   r   	__anext__M   s    z_QueueIterator.__anext__)r   r   r   r.   r
   r   r1   r   r   r   r   r+   I   s   r+   c                   @   s~  e Zd ZdZdZd2eddddZeeddd	Zedd
dZ	e
dddZe
dddZd3eeeeejf  ddddZeddddZd4eeeejf  ee dddZedddZddddZd5eeeejf  ed dddZee dd d!Zddd"d#Zedd$d%Zeddd&d'Zeddd(d)Zddd*d+Z e!dd,d-Z"e!dd.d/Z#e!dd0d1Z$dS )6r   a  Coordinate producer and consumer coroutines.

    If maxsize is 0 (the default) the queue size is unbounded.

    .. testcode::

        import asyncio
        from tornado.ioloop import IOLoop
        from tornado.queues import Queue

        q = Queue(maxsize=2)

        async def consumer():
            async for item in q:
                try:
                    print('Doing work on %s' % item)
                    await asyncio.sleep(0.01)
                finally:
                    q.task_done()

        async def producer():
            for item in range(5):
                await q.put(item)
                print('Put %s' % item)

        async def main():
            # Start consumer without waiting (since it never finishes).
            IOLoop.current().spawn_callback(consumer)
            await producer()     # Wait for producer to put all tasks.
            await q.join()       # Wait for consumer to finish all tasks.
            print('Done')

        asyncio.run(main())

    .. testoutput::

        Put 0
        Put 1
        Doing work on 0
        Put 2
        Doing work on 1
        Put 3
        Doing work on 2
        Put 4
        Doing work on 3
        Doing work on 4
        Done


    In versions of Python without native coroutines (before 3.5),
    ``consumer()`` could be written as::

        @gen.coroutine
        def consumer():
            while True:
                item = yield q.get()
                try:
                    print('Doing work on %s' % item)
                    yield gen.sleep(0.01)
                finally:
                    q.task_done()

    .. versionchanged:: 4.3
       Added ``async for`` support in Python 3.5.

    Nr   )maxsizer   c                 C   sb   |d krt d|dk r td|| _|   tg | _tg | _d| _t	 | _
| j
  d S )Nzmaxsize can't be Noner   zmaxsize can't be negative)	TypeError
ValueError_maxsize_initcollectionsdeque_getters_putters_unfinished_tasksr   	_finishedset)r-   r2   r   r   r   r.      s    zQueue.__init__r   c                 C   s   | j S )z%Number of items allowed in the queue.)r5   r0   r   r   r   r2      s    zQueue.maxsizec                 C   s
   t | jS )zNumber of items in the queue.)len_queuer0   r   r   r   qsize   s    zQueue.qsizec                 C   s   | j  S r    r?   r0   r   r   r   empty   s    zQueue.emptyc                 C   s    | j dkrdS |  | j kS d S )Nr   F)r2   r@   r0   r   r   r   full   s    
z
Queue.fullzFuture[None])itemr   r   c                 C   sR   t  }z| | W n. tk
rB   | j||f t|| Y nX |d |S )a  Put an item into the queue, perhaps waiting until there is room.

        Returns a Future, which raises `tornado.util.TimeoutError` after a
        timeout.

        ``timeout`` may be a number denoting a time (on the same
        scale as `tornado.ioloop.IOLoop.time`, normally `time.time`), or a
        `datetime.timedelta` object for a deadline relative to the
        current time.
        N)r   
put_nowaitr   r:   appendr*   
set_result)r-   rD   r   r   r   r   r   put   s    
z	Queue.putrD   r   c                 C   s^   |    | jrB|  std| j }| | t||   n|  rPt	n
| | dS )z{Put an item into the queue without blocking.

        If no free slot is immediately available, raise `QueueFull`.
        z)queue non-empty, why are getters waiting?N)
_consume_expiredr9   rB   AssertionErrorpopleft_Queue__put_internalr   _getrC   r   )r-   rD   getterr   r   r   rE      s    

zQueue.put_nowait)r   r   c                 C   sH   t  }z||   W n* tk
rB   | j| t|| Y nX |S )a.  Remove and return an item from the queue.

        Returns an awaitable which resolves once an item is available, or raises
        `tornado.util.TimeoutError` after a timeout.

        ``timeout`` may be a number denoting a time (on the same
        scale as `tornado.ioloop.IOLoop.time`, normally `time.time`), or a
        `datetime.timedelta` object for a deadline relative to the
        current time.

        .. note::

           The ``timeout`` argument of this method differs from that
           of the standard library's `queue.Queue.get`. That method
           interprets numeric values as relative timeouts; this one
           interprets them as absolute deadlines and requires
           ``timedelta`` objects for relative timeouts (consistent
           with other timeouts in Tornado).

        )r   rG   
get_nowaitr   r9   rF   r*   )r-   r   r   r   r   r   r/      s    z	Queue.getc                 C   s`   |    | jrH|  std| j \}}| | t|d |  S |  rX|  S t	dS )zRemove and return an item from the queue without blocking.

        Return an item if one is immediately available, else raise
        `QueueEmpty`.
        z(queue not full, why are putters waiting?N)
rJ   r:   rC   rK   rL   rM   r   rN   r@   r   )r-   rD   Zputterr   r   r   rP      s    

zQueue.get_nowaitc                 C   s8   | j dkrtd|  j d8  _ | j dkr4| j  dS )a  Indicate that a formerly enqueued task is complete.

        Used by queue consumers. For each `.get` used to fetch a task, a
        subsequent call to `.task_done` tells the queue that the processing
        on the task is complete.

        If a `.join` is blocking, it resumes when all items have been
        processed; that is, when every `.put` is matched by a `.task_done`.

        Raises `ValueError` if called more times than `.put`.
        r   z!task_done() called too many times   N)r;   r4   r<   r=   r0   r   r   r   	task_done  s
    

zQueue.task_donec                 C   s   | j |S )zBlock until all items in the queue are processed.

        Returns an awaitable, which raises `tornado.util.TimeoutError` after a
        timeout.
        )r<   wait)r-   r   r   r   r   join$  s    z
Queue.joinc                 C   s   t | S r    )r+   r0   r   r   r   	__aiter__.  s    zQueue.__aiter__c                 C   s   t  | _d S r    )r7   r8   r?   r0   r   r   r   r6   2  s    zQueue._initc                 C   s
   | j  S r    )r?   rL   r0   r   r   r   rN   5  s    z
Queue._getc                 C   s   | j | d S r    r?   rF   r-   rD   r   r   r   _put8  s    z
Queue._putc                 C   s&   |  j d7  _ | j  | | d S )NrQ   )r;   r<   clearrX   rW   r   r   r   Z__put_internal=  s    
zQueue.__put_internalc                 C   sH   | j r$| j d d  r$| j   q | jrD| jd  rD| j  q$d S )Nr   rQ   )r:   r!   rL   r9   r0   r   r   r   rJ   B  s    zQueue._consume_expiredc                 C   s    dt | jtt| |  f S )Nz<%s at %s %s>)typer   hexid_formatr0   r   r   r   __repr__J  s    zQueue.__repr__c                 C   s   dt | j|  f S )Nz<%s %s>)rZ   r   r]   r0   r   r   r   __str__M  s    zQueue.__str__c                 C   sn   d| j f }t| dd r&|d| j 7 }| jr>|dt| j 7 }| jrV|dt| j 7 }| jrj|d| j 7 }|S )Nz
maxsize=%rr?   z	 queue=%rz getters[%s]z putters[%s]z	 tasks=%s)r2   getattrr?   r9   r>   r:   r;   )r-   resultr   r   r   r]   P  s    zQueue._format)r   )N)N)N)%r   r   r   r   r?   intr.   propertyr2   r@   boolrB   rC   r   r   r   floatdatetime	timedeltarH   rE   r
   r/   rP   rR   rT   r+   rU   r6   rN   rX   rM   rJ   strr^   r_   r]   r   r   r   r   r   Q   sH   E    
c                   @   s<   e Zd ZdZddddZeddddZedd	d
ZdS )r   a  A `.Queue` that retrieves entries in priority order, lowest first.

    Entries are typically tuples like ``(priority number, data)``.

    .. testcode::

        import asyncio
        from tornado.queues import PriorityQueue

        async def main():
            q = PriorityQueue()
            q.put((1, 'medium-priority item'))
            q.put((0, 'high-priority item'))
            q.put((10, 'low-priority item'))

            print(await q.get())
            print(await q.get())
            print(await q.get())

        asyncio.run(main())

    .. testoutput::

        (0, 'high-priority item')
        (1, 'medium-priority item')
        (10, 'low-priority item')
    Nr   c                 C   s
   g | _ d S r    rA   r0   r   r   r   r6   z  s    zPriorityQueue._initrI   c                 C   s   t | j| d S r    )heapqheappushr?   rW   r   r   r   rX   }  s    zPriorityQueue._putc                 C   s   t | jS r    )ri   heappopr?   r0   r   r   r   rN     s    zPriorityQueue._getr   r   r   r   r6   r   rX   rN   r   r   r   r   r   ]  s   c                   @   s<   e Zd ZdZddddZeddddZedd	d
ZdS )r   a  A `.Queue` that retrieves the most recently put items first.

    .. testcode::

        import asyncio
        from tornado.queues import LifoQueue

        async def main():
            q = LifoQueue()
            q.put(3)
            q.put(2)
            q.put(1)

            print(await q.get())
            print(await q.get())
            print(await q.get())

        asyncio.run(main())

    .. testoutput::

        1
        2
        3
    Nr   c                 C   s
   g | _ d S r    rA   r0   r   r   r   r6     s    zLifoQueue._initrI   c                 C   s   | j | d S r    rV   rW   r   r   r   rX     s    zLifoQueue._putc                 C   s
   | j  S r    )r?   popr0   r   r   r   rN     s    zLifoQueue._getrl   r   r   r   r   r     s   )"r   r7   rf   ri   Ztornador   r   Ztornado.concurrentr   r   Ztornado.locksr   typingr   r   r	   r
   r   TYPE_CHECKINGr   r   r   r   __all__	Exceptionr   r   re   rg   r*   r+   r   r   r   r   r   r   r   <module>   s0      '