import numpy as np
import dask.array as da
import pandas as pd
import sqlalchemy as db
from sqlalchemy import create_engine
import sqlite3
import pandas as pd
engine = db.create_engine("sqlite:///fiscal.db")
connection = engine.connect()
metadata = db.MetaData()
#engine.execute("SELECT * FROM fiscal_data LIMIT 1").fetchall()
sql = """
SELECT year
, region
, province
, gdp
, fdi
, it
, specific
FROM fiscal_table
"""

cnxn = connection
df = pd.read_sql(sql, cnxn)
from dask.distributed import Client

client = Client(processes=False, threads_per_worker=2,
                n_workers=3, memory_limit='4GB')
client

Client

Cluster

  • Workers: 3
  • Cores: 6
  • Memory: 12.00 GB
client.restart()

Client

Cluster

  • Workers: 3
  • Cores: 6
  • Memory: 12.00 GB
from dask import dataframe as dd 
ddf = dd.from_pandas(df, npartitions=5)
print(ddf)
Dask DataFrame Structure:
                year  region province      gdp    fdi     it specific
npartitions=5                                                        
0              int64  object   object  float64  int64  int64  float64
72               ...     ...      ...      ...    ...    ...      ...
...              ...     ...      ...      ...    ...    ...      ...
288              ...     ...      ...      ...    ...    ...      ...
359              ...     ...      ...      ...    ...    ...      ...
Dask Name: from_pandas, 5 tasks
ddf.head()
year region province gdp fdi it specific
0 1996 East China Anhui 2093.30 50661 631930 147002.0
1 1997 East China Anhui 2347.32 43443 657860 151981.0
2 1998 East China Anhui 2542.96 27673 889463 174930.0
3 1999 East China Anhui 2712.34 26131 1227364 285324.0
4 2000 East China Anhui 2902.09 31847 1499110 195580.0
client.id
'Client-0ac0cc94-0e22-11eb-a4ae-d71460f30774'
# Selecting Features and Target
feat_list = ["year", "fdi"]
cat_feat_list = ["region", "province"]
target = ["gdp"]
ddf["year"] = ddf["year"].astype(int)
ddf["fdi"] = ddf["fdi"].astype(float)
ddf["gdp"] = ddf["gdp"].astype(float)
ddf["it"] = ddf["it"].astype(float)
#OHE
from dask_ml.preprocessing import OneHotEncoder
ddf = ddf.categorize(cat_feat_list)
ohe = OneHotEncoder(sparse=False)
ohe_ddf = ohe.fit_transform(ddf[cat_feat_list])
feat_list = feat_list + ohe_ddf.columns.tolist()
feat_list = [f for f in feat_list if f not in cat_feat_list]
#client.close()
ddf_processed = (dd.concat([ddf,ohe_ddf], axis=1) [feat_list + target])
ddf_processed.compute()
year fdi region_East China region_North China region_Southwest China region_Northwest China region_South Central China region_Northeast China province_Anhui province_Beijing ... province_Shandong province_Shanghai province_Shanxi province_Sichuan province_Tianjin province_Tibet province_Xinjiang province_Yunnan province_Zhejiang gdp
0 1996 50661.0 1.0 0.0 0.0 0.0 0.0 0.0 1.0 0.0 ... 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 2093.30
1 1997 43443.0 1.0 0.0 0.0 0.0 0.0 0.0 1.0 0.0 ... 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 2347.32
2 1998 27673.0 1.0 0.0 0.0 0.0 0.0 0.0 1.0 0.0 ... 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 2542.96
3 1999 26131.0 1.0 0.0 0.0 0.0 0.0 0.0 1.0 0.0 ... 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 2712.34
4 2000 31847.0 1.0 0.0 0.0 0.0 0.0 0.0 1.0 0.0 ... 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 2902.09
... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ...
355 2003 498055.0 1.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 ... 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 1.0 9705.02
356 2004 668128.0 1.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 ... 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 1.0 11648.70
357 2005 772000.0 1.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 ... 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 1.0 13417.68
358 2006 888935.0 1.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 ... 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 1.0 15718.47
359 2007 1036576.0 1.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 ... 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 1.0 18753.73

360 rows × 39 columns

client.restart()
client.close()