import dask.bag as db
import re
book_bag = db.from_url('https://www.gutenberg.org/cache/epub/600/pg600.txt')
book_bag.take(5)
(b"\xef\xbb\xbfProject Gutenberg's Notes from the Underground, by Feodor Dostoevsky\r\n",
 b'\r\n',
 b'This eBook is for the use of anyone anywhere at no cost and with\r\n',
 b'almost no restrictions whatsoever.  You may copy it, give it away or\r\n',
 b're-use it under the terms of the Project Gutenberg License included\r\n')
remove_spaces = book_bag.map(lambda x:x.strip())
remove_spaces.take(10)
(b"\xef\xbb\xbfProject Gutenberg's Notes from the Underground, by Feodor Dostoevsky",
 b'',
 b'This eBook is for the use of anyone anywhere at no cost and with',
 b'almost no restrictions whatsoever.  You may copy it, give it away or',
 b're-use it under the terms of the Project Gutenberg License included',
 b'with this eBook or online at www.gutenberg.net',
 b'',
 b'',
 b'Title: Notes from the Underground',
 b'')
def decode_to_ascii(x):
    return x.decode("ascii","ignore") 
ascii_text = remove_spaces.map(decode_to_ascii)
ascii_text.take(10)
("Project Gutenberg's Notes from the Underground, by Feodor Dostoevsky",
 '',
 'This eBook is for the use of anyone anywhere at no cost and with',
 'almost no restrictions whatsoever.  You may copy it, give it away or',
 're-use it under the terms of the Project Gutenberg License included',
 'with this eBook or online at www.gutenberg.net',
 '',
 '',
 'Title: Notes from the Underground',
 '')
def remove_punctuation(x):
    return re.sub(r'[^\w\s]','',x)
remove_punctuation = ascii_text.map(remove_punctuation)
remove_punctuation.take(10)
('Project Gutenbergs Notes from the Underground by Feodor Dostoevsky',
 '',
 'This eBook is for the use of anyone anywhere at no cost and with',
 'almost no restrictions whatsoever  You may copy it give it away or',
 'reuse it under the terms of the Project Gutenberg License included',
 'with this eBook or online at wwwgutenbergnet',
 '',
 '',
 'Title Notes from the Underground',
 '')
lower_text = remove_punctuation.map(str.lower)
lower_text.take(10)
('project gutenbergs notes from the underground by feodor dostoevsky',
 '',
 'this ebook is for the use of anyone anywhere at no cost and with',
 'almost no restrictions whatsoever  you may copy it give it away or',
 'reuse it under the terms of the project gutenberg license included',
 'with this ebook or online at wwwgutenbergnet',
 '',
 '',
 'title notes from the underground',
 '')
split_word_list = lower_text.map(lambda x: x.split(' '))
split_word_list.take(10)
(['project',
  'gutenbergs',
  'notes',
  'from',
  'the',
  'underground',
  'by',
  'feodor',
  'dostoevsky'],
 [''],
 ['this',
  'ebook',
  'is',
  'for',
  'the',
  'use',
  'of',
  'anyone',
  'anywhere',
  'at',
  'no',
  'cost',
  'and',
  'with'],
 ['almost',
  'no',
  'restrictions',
  'whatsoever',
  '',
  'you',
  'may',
  'copy',
  'it',
  'give',
  'it',
  'away',
  'or'],
 ['reuse',
  'it',
  'under',
  'the',
  'terms',
  'of',
  'the',
  'project',
  'gutenberg',
  'license',
  'included'],
 ['with', 'this', 'ebook', 'or', 'online', 'at', 'wwwgutenbergnet'],
 [''],
 [''],
 ['title', 'notes', 'from', 'the', 'underground'],
 [''])
def remove_empty_words(word_list):
    return list(filter(lambda a: a != '', word_list))

non_empty_words = split_word_list.filter(remove_empty_words)
non_empty_words.take(10)
(['project',
  'gutenbergs',
  'notes',
  'from',
  'the',
  'underground',
  'by',
  'feodor',
  'dostoevsky'],
 ['this',
  'ebook',
  'is',
  'for',
  'the',
  'use',
  'of',
  'anyone',
  'anywhere',
  'at',
  'no',
  'cost',
  'and',
  'with'],
 ['almost',
  'no',
  'restrictions',
  'whatsoever',
  '',
  'you',
  'may',
  'copy',
  'it',
  'give',
  'it',
  'away',
  'or'],
 ['reuse',
  'it',
  'under',
  'the',
  'terms',
  'of',
  'the',
  'project',
  'gutenberg',
  'license',
  'included'],
 ['with', 'this', 'ebook', 'or', 'online', 'at', 'wwwgutenbergnet'],
 ['title', 'notes', 'from', 'the', 'underground'],
 ['author', 'feodor', 'dostoevsky'],
 ['posting', 'date', 'september', '13', '2008', 'ebook', '600'],
 ['release', 'date', 'july', '1996'],
 ['language', 'english'])
all_words = non_empty_words.flatten()
type(all_words)
dask.bag.core.Bag
all_words.take(30)
('project',
 'gutenbergs',
 'notes',
 'from',
 'the',
 'underground',
 'by',
 'feodor',
 'dostoevsky',
 'this',
 'ebook',
 'is',
 'for',
 'the',
 'use',
 'of',
 'anyone',
 'anywhere',
 'at',
 'no',
 'cost',
 'and',
 'with',
 'almost',
 'no',
 'restrictions',
 'whatsoever',
 '',
 'you',
 'may')
change_to_key_value = all_words.map(lambda x: (x, 1))
change_to_key_value.take(4)
(('project', 1), ('gutenbergs', 1), ('notes', 1), ('from', 1))
grouped_words = all_words.groupby(lambda x:x)
grouped_words.take(1)
(('project',
  ['project',
   'project',
   'project',
   'project',
   'project',
   'project',
   'project',
   'project',
   'project',
   'project',
   'project',
   'project',
   'project',
   'project',
   'project',
   'project',
   'project',
   'project',
   'project',
   'project',
   'project',
   'project',
   'project',
   'project',
   'project',
   'project',
   'project',
   'project',
   'project',
   'project',
   'project',
   'project',
   'project',
   'project',
   'project',
   'project',
   'project',
   'project',
   'project',
   'project',
   'project',
   'project',
   'project',
   'project',
   'project',
   'project',
   'project',
   'project',
   'project',
   'project',
   'project',
   'project',
   'project',
   'project',
   'project',
   'project',
   'project',
   'project',
   'project',
   'project',
   'project',
   'project',
   'project',
   'project',
   'project',
   'project',
   'project',
   'project',
   'project',
   'project',
   'project',
   'project',
   'project',
   'project',
   'project',
   'project',
   'project',
   'project',
   'project',
   'project',
   'project',
   'project',
   'project',
   'project',
   'project',
   'project',
   'project']),)
word_count = grouped_words.map(lambda x: (x[0], len(x[1])))
word_count.take(10)
(('project', 87),
 ('gutenbergs', 2),
 ('notes', 11),
 ('from', 186),
 ('the', 1555),
 ('underground', 26),
 ('by', 153),
 ('feodor', 3),
 ('dostoevsky', 3),
 ('this', 237))
change_to_key_value.take(10)
(('project', 1),
 ('gutenbergs', 1),
 ('notes', 1),
 ('from', 1),
 ('the', 1),
 ('underground', 1),
 ('by', 1),
 ('feodor', 1),
 ('dostoevsky', 1),
 ('this', 1))
# Take a running count of a word
# In this case, the default value of 
# count needs to be provided
def add_bin_op(count, x):
    return count + x[1]

# Take the output from multiple bin_op(s)
# and add them to get the total count of
# a word
def add_combine_op(x, y):
    return x + y

word_count = change_to_key_value.foldby(lambda x: x[0],
                                       add_bin_op, 0,
                                       add_combine_op)
word_count.take(10)
(('project', 87),
 ('gutenbergs', 2),
 ('notes', 11),
 ('from', 186),
 ('the', 1555),
 ('underground', 26),
 ('by', 153),
 ('feodor', 3),
 ('dostoevsky', 3),
 ('this', 237))
much_easier = all_words.frequencies()
much_easier.take(10)
(('project', 87),
 ('gutenbergs', 2),
 ('notes', 11),
 ('from', 186),
 ('the', 1555),
 ('underground', 26),
 ('by', 153),
 ('feodor', 3),
 ('dostoevsky', 3),
 ('this', 237))

Removing stop words in top word frequency counts

from spacy.lang.en import STOP_WORDS
without_stopwords = all_words.filter(lambda x: x not in STOP_WORDS)
new_freq = without_stopwords.frequencies()
new_freq.take(20)
(('project', 87),
 ('gutenbergs', 2),
 ('notes', 11),
 ('underground', 26),
 ('feodor', 3),
 ('dostoevsky', 3),
 ('ebook', 9),
 ('use', 18),
 ('cost', 5),
 ('restrictions', 3),
 ('whatsoever', 2),
 ('', 1896),
 ('copy', 12),
 ('away', 59),
 ('reuse', 2),
 ('terms', 24),
 ('gutenberg', 28),
 ('license', 15),
 ('included', 6),
 ('online', 4))
new_freq.topk(10)
dask.bag<topk-aggregate, npartitions=1>
new_freq.topk(10, key=lambda x: x[1]).compute()
[('', 1896),
 ('man', 122),
 ('know', 90),
 ('project', 87),
 ('time', 83),
 ('like', 82),
 ('come', 74),
 ('course', 73),
 ('love', 72),
 ('life', 69)]