from dask.distributed import Client

client = Client(n_workers=4)

client
/opt/venv/lib/python3.7/site-packages/distributed/node.py:155: UserWarning: Port 8787 is already in use.
Perhaps you already have a cluster running?
Hosting the HTTP server on port 45123 instead
  http_address["port"], self.http_server.port

Client

Cluster

  • Workers: 4
  • Cores: 4
  • Memory: 5.00 GB
import pandas as pd
url = 'https://raw.githubusercontent.com/davidrkearney/Kearney_Data_Science/master/_notebooks/df_panel_fix.csv'
df = pd.read_csv(url, error_bad_lines=False)
df

import dask.dataframe as dd
import aiohttp

ddf = dd.read_csv(
    url,
    blocksize="10 MiB",
).persist()
ddf
Dask DataFrame Structure:
Unnamed: 0 province specific general year gdp fdi rnr rr i fr reg it
npartitions=1
int64 object float64 float64 int64 float64 int64 float64 float64 float64 object object int64
... ... ... ... ... ... ... ... ... ... ... ... ...
Dask Name: read-csv, 1 tasks
# See that we actually have a collection of Pandas DataFrames
ddf.map_partitions(type).compute()
0    <class 'pandas.core.frame.DataFrame'>
dtype: object
# View head of Dask DataFrame
ddf.head()
Unnamed: 0 province specific general year gdp fdi rnr rr i fr reg it
0 0 Anhui 147002.0 NaN 1996 2093.30 50661 0.0 0.0 0.0 1128873 East China 631930
1 1 Anhui 151981.0 NaN 1997 2347.32 43443 0.0 0.0 0.0 1356287 East China 657860
2 2 Anhui 174930.0 NaN 1998 2542.96 27673 0.0 0.0 0.0 1518236 East China 889463
3 3 Anhui 285324.0 NaN 1999 2712.34 26131 NaN NaN NaN 1646891 East China 1227364
4 4 Anhui 195580.0 32100.0 2000 2902.09 31847 0.0 0.0 0.0 1601508 East China 1499110
gdp = ddf.groupby('province').gdp.mean()
gdp.compute()
province
Anhui            3905.870000
Beijing          4673.453333
Chongqing        2477.712500
Fujian           4864.023333
Gansu            1397.832500
Guangdong       15358.781667
Guangxi          2924.104167
Guizhou          1422.010833
Hainan            686.714167
Hebei            6936.825000
Heilongjiang     4041.241667
Henan            7208.966667
Hubei            4772.503333
Hunan            4765.891667
Jiangsu         10761.846667
Jiangxi          2460.782500
Jilin            2274.854167
Liaoning         5231.135000
Ningxia           432.268333
Qinghai           383.099167
Shaanxi          2658.034167
Shandong        12324.002500
Shanghai         6432.454167
Shanxi           2817.210833
Sichuan          5377.790000
Tianjin          2528.665000
Tibet             170.426667
Xinjiang         1828.896667
Yunnan           2604.054167
Zhejiang         9138.151667
Name: gdp, dtype: float64
gdp.compute().sort_values()
province
Tibet             170.426667
Qinghai           383.099167
Ningxia           432.268333
Hainan            686.714167
Gansu            1397.832500
Guizhou          1422.010833
Xinjiang         1828.896667
Jilin            2274.854167
Jiangxi          2460.782500
Chongqing        2477.712500
Tianjin          2528.665000
Yunnan           2604.054167
Shaanxi          2658.034167
Shanxi           2817.210833
Guangxi          2924.104167
Anhui            3905.870000
Heilongjiang     4041.241667
Beijing          4673.453333
Hunan            4765.891667
Hubei            4772.503333
Fujian           4864.023333
Liaoning         5231.135000
Sichuan          5377.790000
Shanghai         6432.454167
Hebei            6936.825000
Henan            7208.966667
Zhejiang         9138.151667
Jiangsu         10761.846667
Shandong        12324.002500
Guangdong       15358.781667
Name: gdp, dtype: float64
ddf[ddf.reg.str.contains('East China')].head()
Unnamed: 0 province specific general year gdp fdi rnr rr i fr reg it
0 0 Anhui 147002.0 NaN 1996 2093.30 50661 0.0 0.0 0.0 1128873 East China 631930
1 1 Anhui 151981.0 NaN 1997 2347.32 43443 0.0 0.0 0.0 1356287 East China 657860
2 2 Anhui 174930.0 NaN 1998 2542.96 27673 0.0 0.0 0.0 1518236 East China 889463
3 3 Anhui 285324.0 NaN 1999 2712.34 26131 NaN NaN NaN 1646891 East China 1227364
4 4 Anhui 195580.0 32100.0 2000 2902.09 31847 0.0 0.0 0.0 1601508 East China 1499110
ec = ddf[ddf.reg.str.contains('East China')]

mean_gdp_prov = ec.groupby('province').gdp.agg(['mean','count'])
mean_gdp_prov.compute()
mean count
province
Anhui 3905.870000 12
Fujian 4864.023333 12
Jiangsu 10761.846667 12
Jiangxi 2460.782500 12
Shandong 12324.002500 12
Shanghai 6432.454167 12
Zhejiang 9138.151667 12
mean_gdp_prov.nlargest(5, 'mean').compute()
mean count
province
Shandong 12324.002500 12
Jiangsu 10761.846667 12
Zhejiang 9138.151667 12
Shanghai 6432.454167 12
Fujian 4864.023333 12
mean_gdp_prov.to_csv('mean_gdp-*.csv') #the * is where the partition number will go
['/home/jovyan/work/mean_gdp-0.csv']
client.close()