Using dask_ml.preprocessing and OneHotEncoder for categorical encoding with Dask
This post includes code from Scalable-Data-Analysis-in-Python-with-Dask and coiled-examples.
import numpy as np
import dask.array as da
import pandas as pd
import sqlalchemy as db
from sqlalchemy import create_engine
import sqlite3
import pandas as pd
engine = db.create_engine("sqlite:///fiscal.db")
connection = engine.connect()
metadata = db.MetaData()
#engine.execute("SELECT * FROM fiscal_data LIMIT 1").fetchall()
sql = """
SELECT year
, region
, province
, gdp
, fdi
, it
, specific
FROM fiscal_table
"""
cnxn = connection
df = pd.read_sql(sql, cnxn)
from dask.distributed import Client
client = Client(processes=False, threads_per_worker=2,
n_workers=3, memory_limit='4GB')
client
client.restart()
from dask import dataframe as dd
ddf = dd.from_pandas(df, npartitions=5)
print(ddf)
ddf.head()
client.id
# Selecting Features and Target
feat_list = ["year", "fdi"]
cat_feat_list = ["region", "province"]
target = ["gdp"]
ddf["year"] = ddf["year"].astype(int)
ddf["fdi"] = ddf["fdi"].astype(float)
ddf["gdp"] = ddf["gdp"].astype(float)
ddf["it"] = ddf["it"].astype(float)
#OHE
from dask_ml.preprocessing import OneHotEncoder
ddf = ddf.categorize(cat_feat_list)
ohe = OneHotEncoder(sparse=False)
ohe_ddf = ohe.fit_transform(ddf[cat_feat_list])
feat_list = feat_list + ohe_ddf.columns.tolist()
feat_list = [f for f in feat_list if f not in cat_feat_list]
#client.close()
ddf_processed = (dd.concat([ddf,ohe_ddf], axis=1) [feat_list + target])
ddf_processed.compute()
client.restart()
client.close()