Using Dask for Arrays
This post includes code from Scalable-Data-Analysis-in-Python-with-Dask.
import numpy as np
import dask.array as da
np_arr = np.random.randint(20, size=20)
np_arr
dask_arr = da.random.randint(20, size=20, chunks=5)
dask_arr
## This is simply because Dask does lazy evaluaion.
### You need to call `compute()` to start the execution
dask_arr.compute()
dask_arr.chunks
dask_arr_from_np = da.from_array(np_arr, chunks=5)
dask_arr_from_np
dask_arr_from_np.compute()
### array operations into a graph to tasks
#### See : http://docs.dask.org/en/latest/graphviz.html
dask_arr_from_np.sum().visualize()
dask_arr_from_np.sum().visualize(rankdir="LR")
(dask_arr_from_np+1).visualize(rankdir="LR")
dask_arr_mean = da.mean(dask_arr_from_np)
dask_arr_mean.compute()
dask_arr_mean.visualize(rankdir="LR")
x = da.random.random(10, chunks=2)
y = da.random.random(10, chunks=2)
sum_x_y = da.add(x, y) #similar to numpy.add
mean_x_y = da.mean(sum_x_y)
sum_x_y.compute()
sum_x_y.visualize()
mean_x_y.visualize()
da_arr_large = da.random.randint(10000, size=(50000, 50000),
chunks=(5000, 1000))
da_sum_large = da_arr_large.sum()
### Get no. bytes using `nbytes` : http://docs.dask.org/en/latest/array-api.html#dask.array.Array.nbytes
da_arr_large.nbytes
### Convert bytes to GB, 1Gb = 1e+9 bytes
da_arr_large.nbytes/1e+9
da_sum_large.compute()
# Dask 2
size_tuple = (500,500)
chunks_tuple = (10,500)
da_arr = da.random.randint(10, size=size_tuple,
chunks=chunks_tuple)
da_arr2 = da.random.randint(10, size=size_tuple,
chunks=chunks_tuple)
def random_func(x):
return np.mean((((x * 2).T)**2),axis=0)
gufoo = da.gufunc(random_func, signature="(i)->()",
output_dtypes=float,
vectorize=True)
random_op_arr = gufoo(da_arr)
random_op_arr.compute()
random_op_arr.shape
@da.as_gufunc(signature="(m,n),(n,j)->(m,j)", output_dtypes=int, allow_rechunk=True)
def random_func(x, y):
return np.matmul(x, y)**2
da_arr3 = da.random.randint(10, size=(200, 100), chunks=(10, 100))
da_arr4 = da.random.randint(10, size=(100, 300), chunks=(5,5))
# random_matmul = random_func(da_arr3, da_arr4)
# random_matmul.compute()
random_matmul.shape
# Dask 3
my_arr = da.random.randint(10, size=20, chunks=3)
my_arr.compute()
my_hundred_arr = my_arr + 100
my_hundred_arr.compute()
(my_arr * (-1)).compute()
dask_sum = my_arr.sum()
dask_sum
my_arr.compute()
dask_sum.compute()
my_ones_arr = da.ones((10,10), chunks=2, dtype=int)
my_ones_arr.compute()
my_ones_arr.mean(axis=0).compute()
my_custom_array = da.random.randint(10, size=(4,4), chunks=(1,4))
my_custom_array.compute()
my_custom_array.mean(axis=0).compute()
my_custom_array.mean(axis=1).compute()
## Slicing
my_custom_array[1:3, 2:4]
my_custom_array[1:3, 2:4].compute()
## Broadcasting
my_custom_array.compute()
my_small_arr = da.ones(4, chunks=2)
my_small_arr.compute()
brd_example1 = da.add(my_custom_array, my_small_arr)
brd_example1.compute()
ten_arr = da.full_like(my_small_arr, 10)
ten_arr.compute()
brd_example2 = da.add(my_custom_array, ten_arr)
brd_example2.compute()
## Reshaping
my_custom_array.shape
custom_arr_1d = my_custom_array.reshape(16)
custom_arr_1d
custom_arr_1d.compute()
# Stacking
stacked_arr = da.stack([brd_example1, brd_example2])
stacked_arr.compute()
another_stacked = da.stack([brd_example1, brd_example2], axis=1)
another_stacked.compute()
# Concatenate
concate_arr = da.concatenate([brd_example1, brd_example2])
concate_arr.compute()
another_concate_arr = da.concatenate([brd_example1, brd_example2],axis=1)
another_concate_arr.compute()
# Dask 4
import numpy as np
import dask.array as da
size_tuple = (18000,18000)
np_arr = np.random.randint(10, size=size_tuple)
np_arr2 = np.random.randint(10, size=size_tuple)
%time (((np_arr * 2).T)**2 + np_arr2 + 100).sum(axis=1).mean()
chunks_tuple = (500, 500)
da_arr = da.from_array(np_arr, chunks=chunks_tuple)
da_arr2 = da.from_array(np_arr2, chunks=chunks_tuple)
%time (((da_arr * 2).T)**2 + da_arr2 + 100).sum(axis=1).mean().compute()
size_tuple = (50000, 50000)
np_arr = np.random.randint(10, size=size_tuple)
np_arr2 = np.random.randint(10, size=size_tuple)
chunks_tuple = (5000, 5000)
da_arr = da.random.randint(10, size=size_tuple,
chunks=chunks_tuple)
da_arr2 = da.random.randint(10, size=size_tuple,
chunks=chunks_tuple)
%time (((da_arr * 2).T)**2 + da_arr2 + 100).sum(axis=1).mean().compute()
da_arr.nbytes/1e+9