Python per se is not a Big Data technology. However, Python in combination with libraries like pandas
or PyTables
allows the management and the analysis of quite large data sets. This IPython Notebook
illustrates some Python approaches in this regard.
For our purposes, we define Big Data as a (number of) object(s) and/or data file(s) that do(es) not fit into the memory of a single computer (server, node, etc.) – whatever hardware your are using for data analytics. On such a data file, typical analytics tasks, like counting, aggregation and selection, shall be implemented.
Sometimes operations on NumPy ndarray
objects generate so many temporary objects that the available memory does not suffice to finish the desired operation. An example might be a.dot(a.T)
, i.e. the dot product of an array a
with iteself transposed.
Such an operation needs memory for three arrays: a
, a.T
and a.dot(a.T)
. If the array a
is sufficiently large, say 50% of the free memory, such an operation is impossible with the usual approach.
A solution is to work with disk-based arrays and to use memory maps of these arrays.
Some imports first and a check of the free memory.
import numpy as np
import psutil
psutil.phymem_usage()
usage(total=4294967296L, used=2757521408L, free=1053130752L, percent=58.6)
We generate a larger NumPy ndarray
object.
m = 13500
n = 13500
%%time
a = np.random.standard_normal((m, n))
CPU times: user 8.38 s, sys: 738 ms, total: 9.11 s Wall time: 9.35 s
a.nbytes
# array of 1.45GB in size
1458000000
Checking memory again – and that the object (reference pointer) indeed owns the data.
psutil.phymem_usage()
# free memory less than needed
# (multiple) copies/temporaries impossible
usage(total=4294967296L, used=3780661248L, free=6787072L, percent=65.6)
a.flags.owndata
# the object owns the in-memory data
True
Simple operations on the in-memory ndarray
object.
a[:3, :3]
# sample data
array([[ 0.12601349, -0.10492917, 0.71605343], [ 0.49088921, 0.55255455, -0.35605608], [ 0.52098822, -0.45979986, 0.80131373]])
%time a.mean()
# reductions work
CPU times: user 377 ms, sys: 100 ms, total: 477 ms Wall time: 478 ms
7.0627656096425453e-05
Now save this object to disk ...
path = './data/'
%time np.save(path + 'od', a)
# save memory array to disk (SSD)
# needs less time than in-memory generation
CPU times: user 81.5 ms, sys: 2.66 s, total: 2.74 s Wall time: 3.52 s
... and delete the in-memory object.
del a
# delete the in-memory version
# to free memory -- somehow ...
# gc does not work "instantly"
psutil.phymem_usage()
# garbage collection does not bring that much ...
# memory usage has not changed significantly
usage(total=4294967296L, used=3764785152L, free=153587712L, percent=63.8)
Using the saved object, we generate a new memmap
object.
od = np.lib.format.open_memmap(path + 'od.npy', dtype=np.float64, mode='r')
# open memmap array with the array file as data
od.flags.owndata
# object does not own the data
False
It mainly behaves the same way as in-memory ndarray
objects behave.
od[:3, :3]
# compare sample data
memmap([[ 0.12601349, -0.10492917, 0.71605343], [ 0.49088921, 0.55255455, -0.35605608], [ 0.52098822, -0.45979986, 0.80131373]])
%time od.mean()
# operations in NumPy as usual
# somewhat slower of course ...
CPU times: user 832 ms, sys: 2.96 s, total: 3.79 s Wall time: 13.8 s
7.0627656096425453e-05
Major memory problems with NumPy ndarray
objects generally arise due to temporary arrays needed to store intermediate results. We therefore generate memmap
objects to store intermediate and final results.
First, for the transpose of the array.
tr = np.memmap(path + 'tr.npy', dtype=np.float64, mode='w+', shape=(n, m))
# memmap object for transpose
%time tr[:] = od.T[:]
# write transpose to disk
CPU times: user 14.6 s, sys: 31.7 s, total: 46.3 s Wall time: 2min 59s
Second, for the final results.
re = np.memmap(path + 're.npy', dtype=np.float64, mode='w+', shape=(m, m))
# memmap object for result
%time re[:] = od.dot(tr)[:]
# store results on disk
CPU times: user 8min 2s, sys: 59.1 s, total: 9min 1s Wall time: 4min 6s
4+ GB of data (od + tr + re
) crunched on a notebook with 4 GB of RAM.
ll ./data/
total 8542992 -rw-r--r-- 1 yhilpisch staff 1458000080 27 Mai 12:19 od.npy -rw-r--r-- 1 yhilpisch staff 1458000000 27 Mai 12:26 re.npy -rw-r--r-- 1 yhilpisch staff 1458000000 27 Mai 12:23 tr.npy
!rm ./data/*
The futures
module allows the use of a separate sub-process for callables.
import futures
# the callable
def generate_array_on_disk(m, n):
a = np.random.standard_normal((m, n))
# memory inefficient operation
np.save(path + 'od.npy', a)
The use of such a sub-process makes sure that any memory used by the sub-process get immediately freed after the sub-process is terminated. This leaves the free memory of the current process mainly unchanged. Avoids "unpredictable" behaviour of Python
garbage collection.
psutil.phymem_usage()
usage(total=4294967296L, used=1613758464L, free=1495994368L, percent=55.1)
%%time
with futures.ProcessPoolExecutor(max_workers=1) as subprocess:
subprocess.submit(generate_array_on_disk, m, n).result()
# separate sub-process is started, the callable is executed
# the process with all its memory usage is killed
CPU times: user 7.79 ms, sys: 16.6 ms, total: 24.4 ms Wall time: 14.8 s
psutil.phymem_usage()
# meanwhile memory was freed again
usage(total=4294967296L, used=1762353152L, free=1570590720L, percent=43.8)
Final look and clean-up.
ll ./data/
total 2847664 -rw-r--r-- 1 yhilpisch staff 1458000080 27 Mai 12:27 od.npy
!rm ./data/*
We generate a CSV file on disk that is too large to fit into memory. We process this file with the help of pandas
and PyTables
.
First, some imports.
import os
import numpy as np
import pandas as pd
import datetime as dt
Number of rows to be generated for random data set.
N = int(1e7)
N
10000000
Using both random integers as well as floats.
ran_int = np.random.randint(0, 10000, size=(2, N))
ran_flo = np.random.standard_normal((2, N))
Path to the files and filename for csv
file.
path = './data/'
csv_name = path + 'data.csv'
Writing the data row by row.
%%time
with open(csv_name, 'wb') as csv_file:
header = 'date,int1,int2,flo1,flo2\n'
csv_file.write(header)
for _ in xrange(10):
# 10 times the original data set
for i in xrange(N):
row = '%s, %i, %i, %f, %f\n' % \
(dt.datetime.now(), ran_int[0, i], ran_int[1, i],
ran_flo[0, i], ran_flo[1, i])
csv_file.write(row)
print os.path.getsize(csv_name)
597774336 1195552768 1793331200 2391109632 2988888064 3586662400 4184440832 4782219264 5379997696 5977772032 CPU times: user 11min 43s, sys: 14.5 s, total: 11min 57s Wall time: 12min 1s
Delete the original NumPy ndarray
objects.
del ran_int
del ran_flo
Reading some rows to check the content.
with open(csv_name, 'rb') as csv_file:
for _ in xrange(5):
print csv_file.readline(),
date,int1,int2,flo1,flo2 2014-05-27 12:37:16.375032, 9860, 1887, 0.355085, 1.214623 2014-05-27 12:37:16.375077, 4124, 4043, 0.949081, -0.198921 2014-05-27 12:37:16.375095, 8067, 8088, 0.819238, 0.528587 2014-05-27 12:37:16.375110, 6096, 6302, -0.273824, 0.082087
The filename for the pandas HDFStore
.
pd_name = path + 'data.h5p'
h5 = pd.HDFStore(pd_name, 'w')
pandas
allows to read data from (large) files chunk-wise via a file-iterator.
it = pd.read_csv(csv_name, iterator=True, chunksize=N / 20)
Reading and storing the data chunk-wise (this processes roughly 12 GB of data).
%%time
for i, chunk in enumerate(it):
h5.append('data', chunk)
if i % 20 == 0:
print os.path.getsize(pd_name)
33118654 698528304 1363688670 2028738730 2693894904 3358922431 4024071885 4689231352 5354406128 6019346434 CPU times: user 3min 37s, sys: 48.7 s, total: 4min 26s Wall time: 4min 40s
The resulting HDF5
file – with 100,000,000 rows and five columns.
h5
<class 'pandas.io.pytables.HDFStore'> File path: ./data/data.h5p /data frame_table (typ->appendable,nrows->100000000,ncols->5,indexers->[index])
The disk-based pandas DataFrame
mainly behaves like an in-memory object – but these operations are not memory efficient.
# h5['data'].describe()
Data selection and plotting works as with regular pandas DataFrame
objects – again not really memory efficient.
# %matplotlib inline
# h5['data']['flo2'][0:N:500].cumsum().plot()
h5.close()
The major reason is that the DataFrame
data structure is broken up (e.g. columns) during storage. For analytics it has to be put together in-memory again.
import tables as tb
h5 = tb.open_file('./data/data.h5p', 'r')
h5
File(filename=./data/data.h5p, title='', mode='r', root_uep='/', filters=Filters(complevel=0, shuffle=False, fletcher32=False, least_significant_digit=None)) / (RootGroup) '' /data (Group) '' /data/table (Table(100000000,)) '' description := { "index": Int64Col(shape=(), dflt=0, pos=0), "values_block_0": Float64Col(shape=(2,), dflt=0.0, pos=1), "values_block_1": Int64Col(shape=(2,), dflt=0, pos=2), "values_block_2": StringCol(itemsize=26, shape=(1,), dflt='', pos=3)} byteorder := 'little' chunkshape := (1985,) autoindex := True colindexes := { "index": Index(6, medium, shuffle, zlib(1)).is_csi=False}
h5.close()
The PyTables
database file.
import tables as tb
tb_name = path + 'data.h5t'
h5 = tb.open_file(tb_name, 'w')
Using a rec array
object of NumPy
to provide the row description for the PyTables
table. To this end, a custom dtype
object is needed.
dty = np.dtype([('date', 'S26'), ('int1', '<i8'), ('int2', '<i8'),
('flo1', '<f8'), ('flo2', '<f8')])
# change dtype for date from object to string
Adding compression to the mix (less storage, better backups, better data transfer, etc.).
filters = tb.Filters(complevel=5, complib='blosc')
Again reading and writing chunk-wise, this time appending to a PyTables table
object (this processes again roughly 12GB of uncompressed data).
it = pd.read_csv(csv_name, iterator=True, chunksize=N / 20)
%%time
tab = h5.create_table('/', 'data', np.array(it.read().to_records(index=False), dty),
filters=filters)
# initialize table object by using first chunk and adjusted dtype
for chunk in it:
tab.append(chunk.to_records(index=False))
tab.flush()
CPU times: user 6min 49s, sys: 6min 33s, total: 13min 23s Wall time: 24min 30s
The resulting table
object – 100,000,000 rows and five columns.
h5.get_filesize()
2139506844L
tab
/data (Table(100000000,), shuffle, blosc(5)) '' description := { "date": StringCol(itemsize=26, shape=(), dflt='', pos=0), "int1": Int64Col(shape=(), dflt=0, pos=1), "int2": Int64Col(shape=(), dflt=0, pos=2), "flo1": Float64Col(shape=(), dflt=0.0, pos=3), "flo2": Float64Col(shape=(), dflt=0.0, pos=4)} byteorder := 'little' chunkshape := (9039,)
Data on disk can be used as if it would be both in-memory and uncompressed. De-compression is done at run-time.
tab[N:N + 3]
# slicing row-wise
array([('2014-05-27 12:38:30.955685', 9860, 1887, 0.355085, 1.214623), ('2014-05-27 12:38:30.955698', 4124, 4043, 0.949081, -0.19892100000000001), ('2014-05-27 12:38:30.955705', 8067, 8088, 0.8192379999999999, 0.528587)], dtype=[('date', 'S26'), ('int1', '<i8'), ('int2', '<i8'), ('flo1', '<f8'), ('flo2', '<f8')])
tab[N:N + 3]['date']
# access selected data points
array(['2014-05-27 12:38:30.955685', '2014-05-27 12:38:30.955698', '2014-05-27 12:38:30.955705'], dtype='|S26')
Counting of rows is easily accomplished (although here not really needed).
%time len(tab[:]['flo1'])
# length of column (object)
CPU times: user 19.2 s, sys: 7.79 s, total: 27 s Wall time: 48 s
100000000
Aggregation operations, like summing up or calculating the mean value, are another application area.
%time tab[:]['flo1'].sum()
# sum over column
CPU times: user 21.9 s, sys: 30.7 s, total: 52.6 s Wall time: 1min 45s
-1200.4380800047452
%time tab[:]['flo1'].mean()
# mean over column
CPU times: user 22 s, sys: 30.9 s, total: 52.9 s Wall time: 1min 50s
-1.2004380800047452e-05
Typical, SQL
-like, conditions and queries can be added.
%time sum([row['flo2'] for row in tab.where('(flo1 > 3) & (int2 < 1000)')])
# sum combined with condition
CPU times: user 19.3 s, sys: 4.31 s, total: 23.6 s Wall time: 12.9 s
150.46715999999984
h5.close()
All operations have been on data sets that do not fit (if uncompressed) into the memory of the machine they haven been implemented on.
ll data/*
-rw-r--r-- 1 yhilpisch staff 5977776071 27 Mai 12:49 data/data.csv -rw-r--r-- 1 yhilpisch staff 6651285042 27 Mai 12:53 data/data.h5p -rw-r--r-- 2 yhilpisch staff 2140122377 27 Mai 13:23 data/data.h5t
Using compression of course reduces the size of the PyTables table
object relative to the csv
and the pandas HDFStore
files. This might, in certain circumstances, lead to file sizes that would again fit in memory.
!rm data/*
Python
and libraries like NumPy, pandas, PyTables
provide useful means and approaches to circumvent the limitations of free memory on a single computer (node, server, etc.).
Key to the performance of such out-of-memory operations are mainly the storage hardware (speed/capacity), the data format used (e.g. HDF5
vs. relational databases) and in some scenarios also the use of performant compression algorithms.
Reading writing speed of SSD
hardware is evolving fast:
http://fastestssd.com: "800GB Intel SSD Announced, Offers 2 GB/s Throughputs"
The Python Quants GmbH – the company Web site
Dr. Yves J. Hilpisch – my personal Web site
Python for Finance – my NEW book (out as Early Release)
Derivatives Analytics with Python – my derivatives analytics book
www.derivatives-analytics-with-python.com
Contact Us