U
    luf(                     @   s   d dl Z d dlZd dlZd dlZd dlZd dlmZ d dlmZ d dl	m
Z
 d dlmZ d dlmZ d dlmZmZmZmZ d dlmZ G d	d
 d
eZeG dd dejZdS )    N)gen)IOStream)app_log)	TCPServer)skipIfNonUnix)AsyncTestCase	ExpectLogbind_unused_portgen_test)Tuplec                   @   s8   e Zd Zedd Zedd Zdd Zedd Zd	S )
TCPServerTestc              	   c   s   G dd dt }d  }}znt \}}| }|| tt }ttd4 |	d|fV  |
dV  | V  tjV  W 5 Q R X W 5 |d k	r|  |d k	r|  X d S )Nc                   @   s   e Zd Zejdd ZdS )zFTCPServerTest.test_handle_stream_coroutine_logging.<locals>.TestServerc                 s   s$   | tdV  |  dd  d S )N   hello   r   )
read_byteslencloseselfstreamaddress r   ?/tmp/pip-unpacked-wheel-bmg6zs32/tornado/test/tcpserver_test.pyhandle_stream   s    zTTCPServerTest.test_handle_stream_coroutine_logging.<locals>.TestServer.handle_streamN__name__
__module____qualname__r   	coroutiner   r   r   r   r   
TestServer   s   r   zException in callback	localhostr   )r   stopr   r	   
add_socketr   socketr   r   connectwriteread_until_closer   Zmoment)r   r   serverclientsockportr   r   r   $test_handle_stream_coroutine_logging   s     


z2TCPServerTest.test_handle_stream_coroutine_loggingc                 c   sp   G dd dt }t \}}| }|| tt }|d|fV  | V }| |d |  |	  d S )Nc                   @   s   e Zd Zdd ZdS )zETCPServerTest.test_handle_stream_native_coroutine.<locals>.TestServerc                    s   | d |  d S )N   data)r$   r   r   r   r   r   r   3   s    
zSTCPServerTest.test_handle_stream_native_coroutine.<locals>.TestServer.handle_streamN)r   r   r   r   r   r   r   r   r   2   s   r   r   r+   )
r   r	   r!   r   r"   r#   r%   assertEqualr    r   )r   r   r(   r)   r&   r'   resultr   r   r   #test_handle_stream_native_coroutine.   s    


z1TCPServerTest.test_handle_stream_native_coroutinec                 C   s.   t  \}}t }|| |  |  d S N)r	   r   r!   r    )r   r(   r)   r&   r   r   r   test_stop_twiceA   s
    

zTCPServerTest.test_stop_twicec              	   #   s   G fdddt }t \}}| | d|fd}dd t|D }g tjfdd  fd	d|D V  | td
d zt|kr| 	d W 5 D ]}|  qX d S )Nc                       s   e Zd Zej fddZdS )z7TCPServerTest.test_stop_in_callback.<locals>.TestServerc                 3   s       | V  d S r/   )r    r%   r   r&   r   r   r   O   s    zETCPServerTest.test_stop_in_callback.<locals>.TestServer.handle_streamNr   r   r1   r   r   r   N   s   r   r   (   c                 S   s   g | ]}t t qS r   )r   r"   ).0ir   r   r   
<listcomp>Y   s     z7TCPServerTest.test_stop_in_callback.<locals>.<listcomp>c                 3   s4   z|  V  W n tk
r$   Y nX  |  d S r/   )r#   EnvironmentErrorappend)c)connected_clientsserver_addrr   r   r#   \   s
    z4TCPServerTest.test_stop_in_callback.<locals>.connectc                    s   g | ]} |qS r   r   )r3   r8   )r#   r   r   r5   e   s     r   zall clients failed connectingzHat least one client should fail connecting for the test to be meaningful)
r   r	   r!   ranger   r   ZassertGreaterr   r   ZskipTest)r   r   r(   r)   NZclientsr8   r   )r#   r9   r&   r:   r   test_stop_in_callbackH   s&    

z#TCPServerTest.test_stop_in_callbackN)r   r   r   r
   r*   r.   r0   r=   r   r   r   r   r      s   

r   c                   @   sD   e Zd Zeeeef dddZdd Zdd Zdd	 Zd
d Z	dS )TestMultiprocess)codereturnc              
   C   sr   zt jtjdgd|ddd}W nF t jk
rd } z&td|j d|j d|j |W 5 d }~X Y nX |j|jfS )Nz-Werror::DeprecationWarningTutf8)capture_outputinputencodingcheckzProcess returned z stdout=z stderr=)	
subprocessrunsys
executableCalledProcessErrorRuntimeError
returncodestdoutstderr)r   r?   r-   er   r   r   run_subproc~   s    
zTestMultiprocess.run_subprocc                 C   s>   t d}| |\}}| dt|d | |d d S )Na  
            import asyncio
            from tornado.tcpserver import TCPServer

            async def main():
                server = TCPServer()
                server.listen(0, address='127.0.0.1')

            asyncio.run(main())
            print('012', end='')
         012textwrapdedentrP   r,   joinsortedr   r?   outerrr   r   r   test_listen_single   s    z#TestMultiprocess.test_listen_singlec                 C   s>   t d}| |\}}| dt|d | |d d S )Na  
            import warnings

            from tornado.ioloop import IOLoop
            from tornado.process import task_id
            from tornado.tcpserver import TCPServer

            warnings.simplefilter("ignore", DeprecationWarning)

            server = TCPServer()
            server.bind(0, address='127.0.0.1')
            server.start(3)
            IOLoop.current().run_sync(lambda: None)
            print(task_id(), end='')
        rQ   rR   rS   rX   r   r   r   test_bind_start   s    z TestMultiprocess.test_bind_startc                 C   s>   t d}| |\}}| dt|d | |d d S )Na  
            import asyncio
            from tornado.netutil import bind_sockets
            from tornado.process import fork_processes, task_id
            from tornado.ioloop import IOLoop
            from tornado.tcpserver import TCPServer

            sockets = bind_sockets(0, address='127.0.0.1')
            fork_processes(3)
            async def post_fork_main():
                server = TCPServer()
                server.add_sockets(sockets)
            asyncio.run(post_fork_main())
            print(task_id(), end='')
        rQ   rR   rS   rX   r   r   r   test_add_sockets   s    z!TestMultiprocess.test_add_socketsc                 C   s>   t d}| |\}}| dt|d | |d d S )Na  
            import asyncio
            import socket
            from tornado.netutil import bind_sockets
            from tornado.process import task_id, fork_processes
            from tornado.tcpserver import TCPServer

            # Pick an unused port which we will be able to bind to multiple times.
            (sock,) = bind_sockets(0, address='127.0.0.1',
                family=socket.AF_INET, reuse_port=True)
            port = sock.getsockname()[1]

            fork_processes(3)

            async def main():
                server = TCPServer()
                server.listen(port, address='127.0.0.1', reuse_port=True)
            asyncio.run(main())
            print(task_id(), end='')
            rQ   rR   rS   rX   r   r   r   test_listen_multi_reuse_port   s    z-TestMultiprocess.test_listen_multi_reuse_portN)
r   r   r   strr   rP   r[   r\   r]   r^   r   r   r   r   r>   w   s
   r>   )r"   rF   rH   rT   ZunittestZtornador   Ztornado.iostreamr   Ztornado.logr   Ztornado.tcpserverr   Ztornado.test.utilr   Ztornado.testingr   r   r	   r
   typingr   r   ZTestCaser>   r   r   r   r   <module>   s   f