U
    NvfA(                  
   @   s   d dl Z d dlZd dlZd dlZddlmZmZ ddlm	Z	 ddl
mZ dgZe Zd adadadd ZdddZG dd deZdS )    N   )ProcessPoolExecutorEXTRA_QUEUED_CALLS)	cpu_count)get_contextget_reusable_executorc               
   C   s,   t  t} td7 a| W  5 Q R  S Q R X dS )zEnsure that each successive executor instance has a unique, monotonic id.

    The purpose of this monotonic id is to help debug and test automated
    instance creation.
    r   N)_executor_lock_next_executor_id)executor_id r   K/tmp/pip-unpacked-wheel-dylwa62s/joblib/externals/loky/reusable_executor.py_get_next_executor_id   s    r   
   Fautor   c
                 C   s&   t j| |||||||||	d
\}
}|
S )a  Return the current ReusableExectutor instance.

    Start a new instance if it has not been started already or if the previous
    instance was left in a broken state.

    If the previous instance does not have the requested number of workers, the
    executor is dynamically resized to adjust the number of workers prior to
    returning.

    Reusing a singleton instance spares the overhead of starting new worker
    processes and importing common python packages each time.

    ``max_workers`` controls the maximum number of tasks that can be running in
    parallel in worker processes. By default this is set to the number of
    CPUs on the host.

    Setting ``timeout`` (in seconds) makes idle workers automatically shutdown
    so as to release system resources. New workers are respawn upon submission
    of new tasks so that ``max_workers`` are available to accept the newly
    submitted tasks. Setting ``timeout`` to around 100 times the time required
    to spawn new processes and import packages in them (on the order of 100ms)
    ensures that the overhead of spawning workers is negligible.

    Setting ``kill_workers=True`` makes it possible to forcibly interrupt
    previously spawned jobs to get a new instance of the reusable executor
    with new constructor argument values.

    The ``job_reducers`` and ``result_reducers`` are used to customize the
    pickling of tasks and results send to the executor.

    When provided, the ``initializer`` is run first in newly spawned
    processes with argument ``initargs``.

    The environment variable in the child process are a copy of the values in
    the main process. One can provide a dict ``{ENV: VAL}`` where ``ENV`` and
    ``VAL`` are string literals to overwrite the environment variable ``ENV``
    in the child processes to value ``VAL``. The environment variables are set
    in the children before any module is loaded. This only works with the
    ``loky`` context.
    )
max_workerscontexttimeoutkill_workersreusejob_reducersresult_reducersinitializerinitargsenv)_ReusablePoolExecutorr   )r   r   r   r   r   r   r   r   r   r   	_executor_r   r   r   r   %   s    4
c                       sT   e Zd Zd fdd	Zedd	d
Z fddZdd Zdd Z fddZ	  Z
S )r   Nr   r   c              
      s,   t  j|||||||	|
d || _|| _d S )N)r   r   r   r   r   r   r   r   )super__init__r
   _submit_resize_lock)selfZsubmit_resize_lockr   r   r   r
   r   r   r   r   r   	__class__r   r   r   i   s    
z_ReusablePoolExecutor.__init__r   Fr   c              
   C   s  t  t}|d kr4|dkr,|d k	r,|j}qLt }n|dkrLtd| dt|tr^t|}|d k	rz| dkrztdt	||||||	|
d}|d krd}t
jd	| d t }|a| t f||d
| a}n|dkr|tk}|jjs|jjs|st|jjrd}n|jjrd}nd}t
jd| d| d |jd|d d  a }a| jf d|i|W  5 Q R  S t
jd|j d d}|| W 5 Q R X ||fS )NTr   z(max_workers must be greater than 0, got .forkz4Cannot use reusable executor with the 'fork' context)r   r   r   r   r   r   r   Fz#Create a executor with max_workers=)r   r
   r   brokenshutdownzarguments have changedz)Creating a new executor with max_workers=z, as the previous instance cannot be reused (z).)waitr   r   z+Reusing existing executor with max_workers=)r   r   _max_workersr   
ValueError
isinstancestrr   Zget_start_methoddictmputildebugr   _executor_kwargs_flagsr%   r&   r   _resize)clsr   r   r   r   r   r   r   r   r   r   executorkwargsZ	is_reusedr
   reasonr   r   r   r      s    

	


z+_ReusablePoolExecutor.get_reusable_executorc              
      s2   | j " t j|f||W  5 Q R  S Q R X d S N)r   r   submit)r    fnargsr5   r!   r   r   r8      s    z_ReusablePoolExecutor.submitc              
   C   s  | j  |d krtdn|| jkr4W 5 Q R  d S | jd krR|| _W 5 Q R  d S |   | jH t| j }t	dd |D }|| _t
||D ]}| jd  qW 5 Q R X t| j|kr| jjstd q|   t| j }tdd |D std qW 5 Q R X d S )Nz&Trying to resize with max_workers=Nonec                 s   s   | ]}|  V  qd S r7   is_alive.0pr   r   r   	<genexpr>   s     z0_ReusablePoolExecutor._resize.<locals>.<genexpr>MbP?c                 s   s   | ]}|  V  qd S r7   r;   r=   r   r   r   r@     s     )r   r)   r(   Z_executor_manager_thread_wait_job_completionZ_processes_management_locklistZ
_processesvaluessumrangeZ_call_queueputlenr1   r%   timesleepZ_adjust_process_countall)r    r   Z	processesZnb_children_aliver   r   r   r   r2      s0    



z_ReusablePoolExecutor._resizec                 C   s>   | j r(tdt tjd| j d | j r:t	d q(dS )z8Wait for the cache to be empty before resizing the pool.z\Trying to resize an executor with running jobs: waiting for jobs completion before resizing.z	Executor z, waiting for jobs completion before resizingrA   N)
Z_pending_work_itemswarningswarnUserWarningr-   r.   r/   r
   rI   rJ   )r    r   r   r   rB     s    z*_ReusablePoolExecutor._wait_job_completionc                    s$   dt   t }t j|||d d S )N   )
queue_size)r   r   r   _setup_queues)r    r   r   rP   r!   r   r   rQ     s      z#_ReusablePoolExecutor._setup_queues)	NNNr   NNNr   N)
NNr   Fr   NNNr   N)__name__
__module____qualname__r   classmethodr   r8   r2   rB   rQ   __classcell__r   r   r!   r   r   h   s4                      [#r   )
NNr   Fr   NNNr   N)rI   rL   	threadingmultiprocessingr-   Zprocess_executorr   r   Zbackend.contextr   backendr   __all__RLockr   r	   r   r0   r   r   r   r   r   r   r   <module>   s0             
C