U
    luf>Z                     @   s   d Z ddlZddlmZ ddlmZ ddlmZ ddlm	Z	m
Z
mZmZmZmZmZmZmZmZmZmZmZ ddlZddlmZmZmZ ddlmZ G d	d
 d
eZG dd dZG dd deejZ G dd dZ!edddZ"G dd deej#e Z$dS )z%Future-returning APIs for coroutines.    N)Future)deque)chain)Any	AwaitableCallableDictList
NamedTupleOptionalTupleTypeTypeVarUnioncastoverload)EVENTSPOLLINPOLLOUT)Literalc                   @   s6   e Zd ZU eed< eed< eed< eed< eed< dS )_FutureEventfuturekindkwargsmsgtimerN)__name__
__module____qualname__r   __annotations__strr   r    r!   r!   //tmp/pip-unpacked-wheel-h6ekxre8/zmq/_future.pyr      s
   
r   c                   @   sV   e Zd ZU dZdZeed< ee ed< edddZ	eddd	Z
dddd
dZdS )_AsynczMixin for common async logicN_current_loop_Futurereturnc                 C   sL   | j dkr&|  | _ | | j  | j S |  }|| j k	rH|| _ | | |S )zGet event loop

        Notice if event loop has changed,
        and register init_io_state on activation of a new event loop
        N)r$   _default_loop_init_io_state)selfZcurrent_loopr!   r!   r"   	_get_loop6   s    



z_Async._get_loopc                 C   s   t dd S )Nz!Must be implemented in a subclassNotImplementedErrorr*   r!   r!   r"   r(   G   s    z_Async._default_loopc                 C   s   d S Nr!   r*   loopr!   r!   r"   r)   J   s    z_Async._init_io_state)N)r   r   r   __doc__r$   r   r   r   r   r+   r(   r)   r!   r!   r!   r"   r#   0   s   
r#   c                       s   e Zd ZU dZed ed< eed< eed< ee ed< eeee	ddd	d
Z
eeddddZdeeeeef   d fddZ  ZS )_AsyncPollerz:Poller that returns a Future on poll, instead of blocking._AsyncSocket_socket_class_READ_WRITEraw_socketsN)r1   socketevtfr'   c                 C   s
   t  dS )z"Schedule callback for a raw socketNr,   )r*   r1   r9   r:   r;   r!   r!   r"   _watch_raw_socketV   s    z_AsyncPoller._watch_raw_socket)r1   socketsr'   c                 G   s
   t  dS )z$Unschedule callback for a raw socketNr,   )r*   r1   r=   r!   r!   r"   _unwatch_raw_socketsZ   s    z!_AsyncPoller._unwatch_raw_socketsr&   c              
      s     |dkr\zt d}W n, tk
rL } z | W 5 d}~X Y nX  |  S    g fdd}fdd jD ]\}}t	|t
jrt	|jsĈj|}|t
j@ r|jdd |t
j@ r|jdd q| d}|t
j@ r|jO }|t
j@ r0|jO }||| q fd	d
}| |dk	r|dkrfdd}	d| |	fdd}
 |
 fdd} |  S )z Return a Future for a poll eventr   Nc                     s      s d  d S r/   done
set_result)argswatcherr!   r"   wake_rawr   s    z#_AsyncPoller.poll.<locals>.wake_rawc                    s   j  f S r/   )r>   r;   )r1   r8   r*   r!   r"   <lambda>w       z#_AsyncPoller.poll.<locals>.<lambda>pollr   c              
      s     rd S  r:z  W n tk
r4   Y nX d S  rR  nLzttd}W n, t	k
r } z| W 5 d }~X Y nX 
| d S Nr   )rA   	cancelledcancelRuntimeError	exceptionset_exceptionsuperr3   rJ   	ExceptionrB   )r;   resulte)	__class__r   r*   rE   r!   r"   on_poll_ready   s    z(_AsyncPoller.poll.<locals>.on_poll_readyc                      s      s d  d S r/   r@   r!   rD   r!   r"   trigger_timeout   s    z*_AsyncPoller.poll.<locals>.trigger_timeoutMbP?c                    s"   t dr  n
  d S )NrN   )hasattrrN   Zremove_timeoutrG   )r1   timeout_handler!   r"   cancel_timeout   s    

z)_AsyncPoller.poll.<locals>.cancel_timeoutc                    s      s   d S r/   )rA   rN   rG   rD   r!   r"   cancel_watcher   s    z)_AsyncPoller.poll.<locals>.cancel_watcher)r%   rR   rJ   rS   rQ   rB   r+   add_done_callbackr=   
isinstance_zmqSocketr5   from_socketr   _add_recv_eventr   _add_send_eventappendr6   r7   r<   
call_later)r*   timeoutrT   rU   rF   r9   maskr:   rW   rX   r\   r]   rV   )r   r1   r8   r*   r[   rE   r"   rJ   ^   sP    








z_AsyncPoller.poll)r?   )r   r   r   r2   r   r   intr	   r   r   r<   r>   r   r   rJ   __classcell__r!   r!   ri   r"   r3   N   s   
r3   c                   @   s   e Zd Zedd ZdS )_NoTimerc                   C   s   d S r/   r!   r!   r!   r!   r"   rN      s    z_NoTimer.cancelN)r   r   r   staticmethodrN   r!   r!   r!   r"   rl      s   rl   Tr4   )boundc                
       s  e Zd ZU dZdZdZded< eZdZ	dKe
d dd fddZedLee deed	d
dZdMe
e dd fddZejjje_ fddZejjje_edNddeeeee  dddZedOddeed eeee  dddZedPddeed eeeej  dddZedQeeeeeee eej f  dddZdReeeeeee eej f  dddZdSeeeeeeejf  dddZ dTeeeee
ej!  ddd Z"dUeeeeeee
ej!  d!d"d#Z#d$d% Z$dej%fee d&d'd(Z&ee' d& fd)d*Z(dVe'ee'ed d, fd-d.Z)d/d0 Z*d1d2 Z+e,d3d4 Z-dWd5d6Z.dXd7d8Z/d9d: Z0d;d< Z1dYd=d>Z2dZd?d@Z3dAdB Z4dCdD Z5dEdF Z6d[dGdHZ7dIdJ Z8  Z9S )\r4   Nr   z_zmq.Socket_shadow_sockr?   )_from_socketr'   c                    s   t |tjrd | }}|d k	r6t j|jd || _n"t j||f| tj| j| _|d k	rztj	| j
j dtdd t | _t | _d| _| jj| _d S )N)shadowz^(io_loop) argument is deprecated in pyzmq 22.2. The currently active loop will always be used.   )
stacklevelr   )r_   r`   ra   rR   __init__Z
underlyingrp   rr   warningswarnrV   r   DeprecationWarningr   _recv_futures_send_futures_stateZFD_fd)r*   contextsocket_typeio_looprq   r   ri   r!   r"   ru      s"    
z_AsyncSocket.__init__)clsr9   r   r'   c                 C   s   | ||dS )z.Create an async socket from an existing Socket)rq   r   r!   )r   r9   r   r!   r!   r"   rb      s    z_AsyncSocket.from_socket)lingerr'   c              	      s|   | j sj| jd k	rjtt| jpg | jp$g }|D ]2}|j s.z|j  W q. t	k
r^   Y q.X q.| 
  t j|d d S )N)r   )closedr|   listr   ry   rz   r   rA   rN   rO   _clear_io_staterR   close)r*   r   
event_listeventri   r!   r"   r      s    
z_AsyncSocket.closec                    s"   t  |}|tkr| | |S r/   )rR   getr   _schedule_remaining_events)r*   keyrT   ri   r!   r"   r     s    
z_AsyncSocket.getF)track)flagsr   r'   c                C   s   d S r/   r!   )r*   r   r   r!   r!   r"   recv_multipart	  s    z_AsyncSocket.recv_multipartT)r   copyr   r'   c                C   s   d S r/   r!   r*   r   r   r   r!   r!   r"   r     s    c                C   s   d S r/   r!   r   r!   r!   r"   r     s    c                 C   s   d S r/   r!   r   r!   r!   r"   r     s    c                 C   s   |  dt|||dS )zvReceive a complete multipart zmq message.

        Returns a Future whose result will be a multipart message.
        r   r   r   r   rc   dictr   r!   r!   r"   r   !  s     c                 C   s   |  dt|||dS )zReceive a single zmq frame.

        Returns a Future, whose result will be the received frame.

        Recommend using recv_multipart instead.
        recvr   r   r   r!   r!   r"   r   ,  s    	z_AsyncSocket.recv)	msg_partsr   r   r'   c                 K   s(   ||d< ||d< ||d< | j d||dS )zqSend a complete multipart zmq message.

        Returns a Future that resolves when sending is complete.
        r   r   r   send_multipartr   r   )rd   )r*   r   r   r   r   r   r!   r!   r"   r   7  s    z_AsyncSocket.send_multipart)datar   r   r   r   r'   c                 K   s<   ||d< ||d< ||d< | t|||d | jd||dS )zSend a single zmq frame.

        Returns a Future that resolves when sending is complete.

        Recommend using send_multipart instead.
        r   r   r   r   sendr   )updater   rd   )r*   r   r   r   r   r   r!   r!   r"   r   C  s
    z_AsyncSocket.sendc                    s>   |     fdd}|  fdd} |  S )zDeserialize with Futuresc              
      st      rdS  r$   nL }z|}W n, tk
rd } z | W 5 d}~X Y nX  | dS )z+Chain result through serialization to recvdN)rA   rP   rQ   rT   rS   rB   )_bufZloadedrU   r;   loadrecvdr!   r"   _chain[  s    z)_AsyncSocket._deserialize.<locals>._chainc                    s      rdS   r  dS )z"Chain cancellation from f to recvdN)rA   rM   rN   )r   )r;   r   r!   r"   _chain_cancell  s    z0_AsyncSocket._deserialize.<locals>._chain_cancel)r%   r^   )r*   r   r   r   r   r!   r   r"   _deserializeW  s    

z_AsyncSocket._deserializer&   c                    s   j rttj }|| tt||	   fdd}
 r`| n
| fdd} |  S )zSpoll the socket for events

        returns a Future for the poll results.
        c                    st      rd S  r:z   W n tk
r4   Y nX d S |  rR   nt } |	d d S rL   )
rA   rM   rN   rO   rP   rQ   r   rT   rB   r   )r;   Zevtsr   poll_futurer*   r!   r"   unwrap_result  s    z(_AsyncSocket.poll.<locals>.unwrap_resultc                    s.      s*z   W n tk
r(   Y nX dS )z4Cancel underlying poll if request has been cancelledN)rA   rN   rO   rK   )r   r!   r"   cancel_poll  s
    z&_AsyncSocket.poll.<locals>.cancel_poll)r   r`   ZZMQErrorENOTSUP_poller_classregisterr   r   rJ   r%   rA   r^   )r*   rg   r   pr   r   r!   r   r"   rJ   w  s    

	
z_AsyncSocket.pollc                    s   t  j||S r/   )rR   recv_string)r*   rC   r   ri   r!   r"   r     s    z_AsyncSocket.recv_stringutf-8)sr   encodingr'   c                    s   t  j|||dS )N)r   r   )rR   send_string)r*   r   r   r   ri   r!   r"   r     s    z_AsyncSocket.send_stringc                    s    fdd}|  ||S )z'Add a timeout for a send or recv Futurec                      s      rd S  t  d S r/   )rA   rQ   r`   Againr!   rK   r!   r"   future_timeout  s    z1_AsyncSocket._add_timeout.<locals>.future_timeout)_call_later)r*   r   rg   r   r!   rK   r"   _add_timeout  s    z_AsyncSocket._add_timeoutc                 C   s   |   ||S )zSchedule a function to be called later

        Override for different IOLoop implementations

        Tornado and asyncio happen to both have ioloop.call_later
        with the same signature.
        )r+   rf   )r*   delaycallbackr!   r!   r"   r     s    z_AsyncSocket._call_laterc                 C   s6   t |D ]\}}|j| kr q$qdS |||  dS )zMake sure that futures are removed from the event list when they resolve

        Avoids delaying cleanup until the next send/recv event,
        which may never come.
        N)	enumerater   remove)r   r   Zf_idxr   r!   r!   r"   _remove_finished_future  s
    
z$_AsyncSocket._remove_finished_futurec           
   
      s  |p
   }|dr~|ddtj@ r~t j|}z|f |}W n, tk
rn } z|| W 5 d}~X Y nX |	| |S t
}ttdr jj}	|	dkr ||	d } jt|||d|d | fdd	  jtt@ r    jr t |S )
z4Add a recv event, returning the corresponding Futurer   r   r   NZRCVTIMEOrY   )r   r   c                    s     |  jS r/   )r   ry   rG   r.   r!   r"   rH     rI   z._AsyncSocket._add_recv_event.<locals>.<lambda>)r%   
startswithr   r`   DONTWAITgetattrrp   rS   rQ   rB   rl   rZ   Zrcvtimeor   ry   re   r   r^   r   r   _handle_recv_add_io_state)
r*   r   r   r   r;   r   rrU   r   
timeout_msr!   r.   r"   rc     s.    



z_AsyncSocket._add_recv_eventc              
      sX  |p
   }|dkr js|dd}| }|tjB |d< t j|}d}	z||f|}
W nh tjk
r } z|tj@ r|	| nd}	W 5 d}~X Y n6 t
k
r } z|	| W 5 d}~X Y nX ||
 |	r jr   |S t}ttdr jtj}|dkr ||d } jt|||||d	 | fd
d  t |S )z4Add a send event, returning the corresponding Future)r   r   r   r   TFNSNDTIMEOrY   )r   r   r   c                    s     |  jS r/   )r   rz   rG   r.   r!   r"   rH   -  rI   z._AsyncSocket._add_send_event.<locals>.<lambda>)r%   rz   r   r   r`   r   r   rp   r   rQ   rS   rB   ry   r   rl   rZ   r   r   re   r   r^   r   r   )r*   r   r   r   r   r;   r   Znowait_kwargsr   Zfinish_earlyr   rU   r   r   r!   r.   r"   rd     sB    




z_AsyncSocket._add_send_eventc           	   
   C   s
  | j tt@ sdS d}| jrD| j \}}}}}| rDd}qqDq| jsT| t |dkr`dS |  |dkr~|	d dS |dkr| j j
}n|dkr| j j}ntd| |d  tjO  < z|f |}W n, tk
r } z|| W 5 d}~X Y nX |	| dS )zHandle recv eventsNrJ   r   r   zUnhandled recv event type: %rr   )rp   r   r   r   ry   popleftrA   _drop_io_staterN   rB   r   r   
ValueErrorr`   r   rS   rQ   )	r*   r;   r   r   r   r   r   rT   rU   r!   r!   r"   r   3  s6    



z_AsyncSocket._handle_recvc           	   
   C   s  | j tt@ sd S d }| jrD| j \}}}}}| rDd }qqDq| jsT| t |d kr`d S |  |dkr~|	d  d S |dkr| j j
}n|dkr| j j}ntd| |d  tjO  < z||f|}W n, tk
r } z|| W 5 d }~X Y nX |	| d S )NrJ   r   r   zUnhandled send event type: %rr   )rp   r   r   r   rz   r   rA   r   rN   rB   r   r   r   r`   r   rS   rQ   )	r*   r;   r   r   r   r   r   rT   rU   r!   r!   r"   _handle_send\  s6    



z_AsyncSocket._handle_sendc                 C   sH   | j jrdS | j t}|tj@ r*|   |tj@ r<|   | 	  dS )z(Dispatch IO events to _handle_recv, etc.N)
rp   r   r   r   r`   r   r   r   r   r   )r*   fdeventsZ
zmq_eventsr!   r!   r"   _handle_events  s    

z_AsyncSocket._handle_eventsc                 C   s>   | j dkrdS |dkr"| jt}|| j @ r:| d| j dS )zkSchedule a call to handle_events next loop iteration

        If there are still events to handle.
        r   N)r{   rp   r   r   r   r   )r*   r   r!   r!   r"   r     s    

z'_AsyncSocket._schedule_remaining_eventsc                 C   s*   | j |kr| j |B  }| _ | | j  dS )zAdd io_state to poller.Nr{   _update_handlerr*   stater!   r!   r"   r     s    
z_AsyncSocket._add_io_statec                 C   s(   | j |@ r| j | @ | _ | | j  dS )z&Stop poller from watching an io_state.Nr   r   r!   r!   r"   r     s    
z_AsyncSocket._drop_io_statec                 C   s   |r|    |   dS )zOUpdate IOLoop handler with state.

        zmq FD is always read-only.
        N)r+   r   r   r!   r!   r"   r     s    z_AsyncSocket._update_handlerc                 C   s6   |dkr|   }|| j| j| j | d| j dS )z#initialize the ioloop event handlerNr   )r+   add_handlerrp   r   r6   r   r0   r!   r!   r"   r)     s    z_AsyncSocket._init_io_statec                 C   s.   | j }| j jr| j}| jdk	r*| j| dS )zNunregister the ioloop event handler

        called once during close
        N)rp   r   r|   r$   Zremove_handler)r*   r   r!   r!   r"   r     s
    
z_AsyncSocket._clear_io_state)Nr?   NN)N)N)r   )r   )r   )r   TF)r   TF)r   TF)r   TF)r   TF)r   r   )NN)NNN)r   r   )N)N):r   r   r   ry   rz   r{   r   r3   r   r|   r   ru   classmethodr   rn   r   rb   rj   r   r`   ra   r2   r   r   boolr   r	   bytesr   r   ZFramer   r   ZMessageTrackerr   r   r   r   rJ   r    r   r   r   r   rm   r   rc   rd   r   r   r   r   r   r   r   r)   r   rk   r!   r!   ri   r"   r4      s   
      
   
                                   3


$
8))


)%r2   rv   Zasyncior   collectionsr   	itertoolsr   typingr   r   r   r   r	   r
   r   r   r   r   r   r   r   Zzmqr`   r   r   r   Zzmq._typingr   r   r#   ZPollerr3   rl   rn   ra   r4   r!   r!   r!   r"   <module>   s   <m