U
    Cvf+W                     @  s8  d dl mZ d dlZd dlZd dlZd dlmZmZmZm	Z	m
Z
mZmZmZ d dlZd dlmZ d dlmZ d dlmZ d dlmZ erd dlmZ d	d
 ZdddddZddddddZdddddZdddddZdd Zddddd d!Z dd"d#d$d%Z!d&d'd'd(d)d*Z"d2ddd,d-d.dd/d0d1Z#dS )3    )annotationsN)TYPE_CHECKINGAnyCallableDefaultDictHashableIterableMappingSequence)align)	DataArray)Datasetis_dask_collection)T_Xarrayc                 C  s   t |  S N)zip)iterable r   8/tmp/pip-unpacked-wheel-h316xyqg/xarray/core/parallel.pyunzip   s    r   r   )abc                 C  sT   |   } |  }t| jt|jD ](}| j| |j| kr&td|dq&d S )NzChunk sizes along dimension z are not equal.)Zunify_chunkssetchunksintersection
ValueError)r   r   dimr   r   r   assert_chunks_compatible    s
    r   zDataArray | DatasetzMapping[str, Any]str)resultexpectedkindc                 C  s~   |dkrd}n|dkrd}|| t t| | }|rJtd| d| dt t| |||  }|rztd| d| dd S )	NcoordsZ
coordinate	data_varsdataz4Result from applying user function does not contain z variables .z2Result from applying user function has unexpected )r   getattrr   )r    r!   r"   Znice_strmissingextrar   r   r   check_result_variables)   s    r*   r   )objreturnc                 C  sD   t | tstdt|  t| jdkr2tdtt| j S )NzExpected Dataset, got    zGTrying to convert Dataset with more than one data variable to DataArray)	
isinstancer   	TypeErrortypelenr$   nextitervalues)r+   r   r   r   dataset_to_dataarrayA   s    
r5   c                 C  s    | j d kr|  }n|  }|S r   )name_to_temp_datasetZ
to_dataset)r+   datasetr   r   r   dataarray_to_datasetM   s    

r9   c                 C  s   t | tr| }t| } nt | tr(d}n| S ddlm} t }| j D ]*\}}||j|j	d}|j
||jf||< qH| j|_|| j}|dk	rt|S |S )zIf obj is a DataArray or Dataset, return a new object of the same type and with
    the same variables and dtypes, but where all variables have size 0 and numpy
    backend.
    If obj is neither a DataArray nor Dataset, return it unaltered.
    Nr   )meta_from_array)ndim)r.   r   r9   r   Zdask.array.utilsr:   	variablesitemsr%   r;   dimsattrs
set_coordsr#   r5   )r+   Z	obj_arrayr:   metar6   variableZmeta_objr   r   r   	make_metaX   s     


rC   zCallable[..., T_Xarray]r   )funcr+   r,   c              
   O  st   dd |f| D }z| ||}W n, t k
rN } zt d|W 5 d}~X Y nX t|ttfsptdt| |S )z<Infer return object by running the function on meta objects.c                 S  s   g | ]}t |qS r   )rC   .0argr   r   r   
<listcomp>x   s     z"infer_template.<locals>.<listcomp>zsCannot infer object returned from running user provided function. Please supply the 'template' kwarg to map_blocks.NzIFunction must return an xarray DataArray or Dataset. Instead it returned )	Exceptionr.   r   r   r/   r0   )rD   r+   argskwargsZ	meta_argstemplateer   r   r   infer_templatet   s    rN   zdict[Hashable, Any])xr,   c                 C  s&   t | tr|  } dd | j D S )zWMap variable name to numpy(-like) data
    (Dataset.to_dict() is too complicated).
    c                 S  s   i | ]\}}||j qS r   )r%   rF   kvr   r   r   
<dictcomp>   s      zmake_dict.<locals>.<dictcomp>)r.   r   r7   r<   r=   rO   r   r   r   	make_dict   s    
rU   r   r	   )r   chunk_indexchunk_boundsc                 C  s6   | |kr.||  }t ||  | ||  |d  S t d S )Nr-   )slice)r   rV   rW   Zwhich_chunkr   r   r   _get_chunk_slicer   s    rY   r   zSequence[Any]zMapping[str, Any] | NonezDataArray | Dataset | None)rD   r+   rJ   rK   rL   r,   c           )   
     sb  dddddddd}dk	rBt ttfsBtd	tj d
t |tsTtd|dkrbi }nt |tsttd| D ]}t	|r|tdq|t	|s| |f||S z ddl
ddlddlm} W n tk
r   Y nX |gt| }dd |D dd |D }	tfddt|D \}
}fddt|D }t|ddi}tdd |D }tttt|
|| dd d\}}t|d jt|d j|dd D ],}t|d | |j |j qdkr\t| |d f||tj}|t@ }|t }fdd|D fdd|D  fd djD 
ntjj

sztd!
D ]V}|kr~t | t 
| kr~td"|d#t |  d$t 
|  d%q~t trd&}j!}" n$t trd'}ntd(t i t#$t}d)%j&'| j()|d ||d*d * D }d+d * D d,d 
* D 	dd-d.d/fd0d1t+j,|  D ]:}tt|- |  fd2dt|D }i }
fd3d * D |d4< tj.- |d5< tj/- |d6<  	fd7dD |d8< f| }|| |||	|f|< i }j0* D ]x\}}|kr|qf| d9 } | ||< | f}!|jD ](}| kr|! | f7 }!n|!d:7 }!qt1j2||f||  |!< qfq|j3fd;d|D d<}"||"j4||"j5fd=d|- D d<}"td>d * D j6d?}#|#jD ]&}$|$ j6|#|$ _6|$ j7|#|$ _7qN|* D ]\}} | j}%g }&|%D ]^}|
kr|&8
|  n>||#jkr|&8|#j9| f n|jkr|&8j9| f qj:j;|"| |&| j<d@}'|%|'| j6f|#|< | j7|#| _7q~|#=j>}#|r^t?|#}(||(_!|(S |#S )Aa  Apply a function to each block of a DataArray or Dataset.

    .. warning::
        This function is experimental and its signature may change.

    Parameters
    ----------
    func : callable
        User-provided function that accepts a DataArray or Dataset as its first
        parameter ``obj``. The function will receive a subset or 'block' of ``obj`` (see below),
        corresponding to one chunk along each chunked dimension. ``func`` will be
        executed as ``func(subset_obj, *subset_args, **kwargs)``.

        This function must return either a single DataArray or a single Dataset.

        This function cannot add a new chunked dimension.
    obj : DataArray, Dataset
        Passed to the function as its first argument, one block at a time.
    args : sequence
        Passed to func after unpacking and subsetting any xarray objects by blocks.
        xarray objects in args must be aligned with obj, otherwise an error is raised.
    kwargs : mapping
        Passed verbatim to func after unpacking. xarray objects, if any, will not be
        subset to blocks. Passing dask collections in kwargs is not allowed.
    template : DataArray or Dataset, optional
        xarray object representing the final result after compute is called. If not provided,
        the function will be first run on mocked-up data, that looks like ``obj`` but
        has sizes 0, to determine properties of the returned object such as dtype,
        variable names, attributes, new dimensions and new indexes (if any).
        ``template`` must be provided if the function changes the size of existing dimensions.
        When provided, ``attrs`` on variables in `template` are copied over to the result. Any
        ``attrs`` set by ``func`` will be ignored.

    Returns
    -------
    A single DataArray or Dataset with dask backend, reassembled from the outputs of the
    function.

    Notes
    -----
    This function is designed for when ``func`` needs to manipulate a whole xarray object
    subset to each block. Each block is loaded into memory. In the more common case where
    ``func`` can work on numpy arrays, it is recommended to use ``apply_ufunc``.

    If none of the variables in ``obj`` is backed by dask arrays, calling this function is
    equivalent to calling ``func(obj, *args, **kwargs)``.

    See Also
    --------
    dask.array.map_blocks, xarray.apply_ufunc, xarray.Dataset.map_blocks
    xarray.DataArray.map_blocks

    Examples
    --------
    Calculate an anomaly from climatology using ``.groupby()``. Using
    ``xr.map_blocks()`` allows for parallel operations with knowledge of ``xarray``,
    its indices, and its methods like ``.groupby()``.

    >>> def calculate_anomaly(da, groupby_type="time.month"):
    ...     gb = da.groupby(groupby_type)
    ...     clim = gb.mean(dim="time")
    ...     return gb - clim
    ...
    >>> time = xr.cftime_range("1990-01", "1992-01", freq="M")
    >>> month = xr.DataArray(time.month, coords={"time": time}, dims=["time"])
    >>> np.random.seed(123)
    >>> array = xr.DataArray(
    ...     np.random.rand(len(time)),
    ...     dims=["time"],
    ...     coords={"time": time, "month": month},
    ... ).chunk()
    >>> array.map_blocks(calculate_anomaly, template=array).compute()
    <xarray.DataArray (time: 24)>
    array([ 0.12894847,  0.11323072, -0.0855964 , -0.09334032,  0.26848862,
            0.12382735,  0.22460641,  0.07650108, -0.07673453, -0.22865714,
           -0.19063865,  0.0590131 , -0.12894847, -0.11323072,  0.0855964 ,
            0.09334032, -0.26848862, -0.12382735, -0.22460641, -0.07650108,
            0.07673453,  0.22865714,  0.19063865, -0.0590131 ])
    Coordinates:
      * time     (time) object 1990-01-31 00:00:00 ... 1991-12-31 00:00:00
        month    (time) int64 1 2 3 4 5 6 7 8 9 10 11 12 1 2 3 4 5 6 7 8 9 10 11 12

    Note that one must explicitly use ``args=[]`` and ``kwargs={}`` to pass arguments
    to the function being applied in ``xr.map_blocks()``:

    >>> array.map_blocks(
    ...     calculate_anomaly,
    ...     kwargs={"groupby_type": "time.year"},
    ...     template=array,
    ... )  # doctest: +ELLIPSIS
    <xarray.DataArray (time: 24)>
    dask.array<<this-array>-calculate_anomaly, shape=(24,), dtype=float64, chunksize=(24,), chunktype=numpy.ndarray>
    Coordinates:
      * time     (time) object 1990-01-31 00:00:00 ... 1991-12-31 00:00:00
        month    (time) int64 dask.array<chunksize=(24,), meta=np.ndarray>
    r   listdictzIterable[bool])rD   rJ   rK   arg_is_arrayr!   c              	   S  s  dd t ||D }| ||}t|d t|j }|rHtd| d|j D ]\}}	||d kr|j| |d | krtd|d|j|  d|d |  d	||d
 krR|d
 | }
|	|
sRtd|d|
d|	dqRt||d t|t	r
t||d t
|S )z
        Wrapper function that receives datasets in args; converts to dataarrays when necessary;
        passes these to the user function `func` and checks returned objects for expected shapes/sizes/etc.
        c                 S  s    g | ]\}}|rt |n|qS r   )r5   )rF   is_arrayrG   r   r   r   rH     s   z0map_blocks.<locals>._wrapper.<locals>.<listcomp>shapeszDimensions z missing on returned object.zReceived dimension z of length z. Expected length r&   indexeszExpected index  to be . Received 	 instead.r#   r$   )r   r   sizesr   _indexesr=   equalsr*   r.   r   rU   )rD   rJ   rK   r\   r!   Zconverted_argsr    Zmissing_dimensionsr6   indexZexpected_indexr   r   r   _wrapper  s2    

$
zmap_blocks.<locals>._wrapperNz2template must be a DataArray or Dataset. Received rb   z7args must be a sequence (for example, a list or tuple).z.kwargs must be a mapping (for example, a dict)zgCannot pass dask collections in kwargs yet. Please compute or load values before passing to map_blocks.r   )HighLevelGraphc                 S  s   g | ]}t |ttfqS r   )r.   r   r   rE   r   r   r   rH   R  s     zmap_blocks.<locals>.<listcomp>c                 S  s   g | ]}t |tqS r   )r.   r   rE   r   r   r   rH   S  s     c                 3  s"   | ]\}} | r||fV  qd S r   r   rF   rf   rG   	is_xarrayr   r   	<genexpr>V  s     zmap_blocks.<locals>.<genexpr>c                   s    g | ]\}} | s||fqS r   r   ri   rj   r   r   rH   Y  s     joinexactc                 s  s$   | ]}t |trt|n|V  qd S r   )r.   r   r9   rE   r   r   r   rl   _  s   c                 S  s   | d S )Nr   r   rT   r   r   r   <lambda>e      zmap_blocks.<locals>.<lambda>)keyr-   c                   s   i | ]}| | qS r   r   rF   r   )input_indexesr   r   rS   v  s      zmap_blocks.<locals>.<dictcomp>c                   s   i | ]}| j | qS r   )rd   )rF   rQ   )rL   r   r   rS   w  s      c                   s   i | ]}| kr| | qS r   r   rr   )input_chunksr   r   rS   x  s      zjProvided template has no dask arrays.  Please construct a template with appropriately chunked dask arrays.z~map_blocks requires that one block of the input maps to one block of output. Expected number of output chunks along dimension r`   ra   zP instead. Please provide template if not provided, or fix the provided template.TFz.func output must be DataArray or Dataset; got z{}-{}c                 S  s   i | ]\}}|t t|qS r   )ranger1   rF   r   Zchunks_vr   r   r   rS     s      c                 S  s    i | ]\}}|t d | qS r   npZcumsumrv   r   r   r   rS     s     c                 S  s    i | ]\}}|t d | qS rw   ry   rv   r   r   r   rS     s     r   r   )graphgnamer8   c                   s   g }g }t   }|j D ]\}}	|	jr|	 }
|	jD ]}|
 |  }
q@| d| d|
d  f| }t |	j|
|	jgf| |< nX fdd|	jD }|		|}| d| dj
| f| }t |j||jgf| |< ||jkr|||g q|||g qtt|ft|f|jfS )z
        Creates a task that subsets an xarray dataset to a block determined by chunk_index.
        Block extents are determined by input_chunk_bounds.
        Also subtasks that subset the constituent variables of a dataset.
        -r   c                   s   i | ]}|t | qS r   rY   rr   rV   input_chunk_boundsr   r   rS     s    z?map_blocks.<locals>.subset_dataset_to_block.<locals>.<dictcomp>)tupler4   r<   r=   r   r%   Z__dask_keys__r>   r?   Ziselbasetokenize_coord_namesappendr   r[   )r{   r|   r8   r   rV   r$   r#   chunk_tupler6   rB   chunkr   Zchunk_variable_taskZ	subsetterZsubsetdaskr   r   subset_dataset_to_block  s6    



z+map_blocks.<locals>.subset_dataset_to_blockc                   s(   g | ] \}}|r | n|qS r   r   )rF   ZisxrrG   )rV   r|   r{   r   r   r   r   rH     s   c                   s&   i | ]\}}| kr| | | qS r   r   rP   )output_chunksr   r   rS     s      r^   r$   r#   c                   s"   i | ]}|| t |  qS r   r~   rr   )rV   r_   output_chunk_boundsr   r   rS     s    r_   r}   rx   c                   s   g | ]}  |r|qS r   r   rE   r   r   r   rH     s     
 )dependenciesc                   s   i | ]}| hqS r   r   )rF   r6   )r|   r   r   rS   &  s      c                 S  s   i | ]\}}||  qS r   )Zto_pandas_index)rF   rQ   idxr   r   r   rS   -  s      )r#   r?   )r6   r   dtype)@r.   r   r   r/   r0   __name__r
   r	   r4   r   r   Z
dask.arrayZdask.highlevelgraphrh   ImportErrorrZ   r   	enumerater   r   sortedr   r[   r   rd   r   updaterN   r   r>   Z
chunksizesr   r1   r6   r7   collectionsdefaultdictformatutilsfuncnamer   r   r=   	itertoolsproductkeysr$   r#   r<   operatorgetitemZfrom_collectionsZlayersr   r?   encodingr   rc   arrayArrayr   r@   r   r5   ))rD   r+   rJ   rK   rL   rg   valuerh   Zall_argsr]   Zxarray_indicesZxarray_objsZothersZaligned_ZnpargsrG   Ztemplate_indexesZpreserved_indexesZnew_indexesr   Zresult_is_arrayZtemplate_nameZ
new_layersZichunkr   Zblocked_argsr!   Zfrom_wrapperZvar_key_mapr6   rB   Zgname_lrq   Zhlgr    rf   r>   Z
var_chunksr%   dar   )rV   r   r|   r{   r_   r   rt   rs   rk   r   r   r   rL   r   
map_blocks   s@   h0






$&


 7	










   r   )r   NN)$
__future__r   r   r   r   typingr   r   r   r   r   r   r	   r
   Znumpyrz   Zxarray.core.alignmentr   Zxarray.core.dataarrayr   Zxarray.core.datasetr   Zxarray.core.pycompatr   Zxarray.core.typesr   r   r   r*   r5   r9   rC   rN   rU   rY   r   r   r   r   r   <module>   s0   (	

   