The Python Quants



Out-of-Memory Data Analytics with Python

Yves Hilpisch

yves@pythonquants.com

www.pythonquants.com

The Python Quants GmbH

Python and Big Data

Python per se is not a Big Data technology. However, Python in combination with libraries like pandas or PyTables allows the management and the analysis of quite large data sets. This IPython Notebook illustrates some Python approaches in this regard.

For our purposes, we define Big Data as a (number of) object(s) and/or data file(s) that do(es) not fit into the memory of a single computer (server, node, etc.) – whatever hardware your are using for data analytics. On such a data file, typical analytics tasks, like counting, aggregation and selection, shall be implemented.

Out-of-Memory Analytics with NumPy

Sometimes operations on NumPy ndarray objects generate so many temporary objects that the available memory does not suffice to finish the desired operation. An example might be a.dot(a.T), i.e. the dot product of an array a with iteself transposed.

Such an operation needs memory for three arrays: a, a.T and a.dot(a.T). If the array a is sufficiently large, say 50% of the free memory, such an operation is impossible with the usual approach.

A solution is to work with disk-based arrays and to use memory maps of these arrays.

Some imports first and a check of the free memory.

In [1]:
import numpy as np
import psutil
In [2]:
psutil.phymem_usage()
Out[2]:
usage(total=4294967296L, used=2757521408L, free=1053130752L, percent=58.6)

Sample Data

We generate a larger NumPy ndarray object.

In [3]:
m = 13500
n = 13500
In [4]:
%%time
a = np.random.standard_normal((m, n))
CPU times: user 8.38 s, sys: 738 ms, total: 9.11 s
Wall time: 9.35 s

In [5]:
a.nbytes
  # array of 1.45GB in size
Out[5]:
1458000000

Checking memory again – and that the object (reference pointer) indeed owns the data.

In [6]:
psutil.phymem_usage()
  # free memory less than needed
  # (multiple) copies/temporaries impossible
Out[6]:
usage(total=4294967296L, used=3780661248L, free=6787072L, percent=65.6)
In [7]:
a.flags.owndata
  # the object owns the in-memory data
Out[7]:
True

Simple operations on the in-memory ndarray object.

In [8]:
a[:3, :3]
  # sample data
Out[8]:
array([[ 0.12601349, -0.10492917,  0.71605343],
       [ 0.49088921,  0.55255455, -0.35605608],
       [ 0.52098822, -0.45979986,  0.80131373]])
In [9]:
%time a.mean()
  # reductions work
CPU times: user 377 ms, sys: 100 ms, total: 477 ms
Wall time: 478 ms

Out[9]:
7.0627656096425453e-05

Now save this object to disk ...

In [10]:
path = './data/'
In [11]:
%time np.save(path + 'od', a)
  # save memory array to disk (SSD)
  # needs less time than in-memory generation
CPU times: user 81.5 ms, sys: 2.66 s, total: 2.74 s
Wall time: 3.52 s

... and delete the in-memory object.

In [12]:
del a
  # delete the in-memory version
  # to free memory -- somehow ...
  # gc does not work "instantly"
In [13]:
psutil.phymem_usage()
  # garbage collection does not bring that much ...
  # memory usage has not changed significantly
Out[13]:
usage(total=4294967296L, used=3764785152L, free=153587712L, percent=63.8)

Memory Map of Data

Using the saved object, we generate a new memmap object.

In [14]:
od = np.lib.format.open_memmap(path + 'od.npy', dtype=np.float64, mode='r')
  # open memmap array with the array file as data
In [15]:
od.flags.owndata
  # object does not own the data
Out[15]:
False

It mainly behaves the same way as in-memory ndarray objects behave.

In [16]:
od[:3, :3]
  # compare sample data
Out[16]:
memmap([[ 0.12601349, -0.10492917,  0.71605343],
       [ 0.49088921,  0.55255455, -0.35605608],
       [ 0.52098822, -0.45979986,  0.80131373]])
In [17]:
%time od.mean()
  # operations in NumPy as usual
  # somewhat slower of course ...
CPU times: user 832 ms, sys: 2.96 s, total: 3.79 s
Wall time: 13.8 s

Out[17]:
7.0627656096425453e-05

Memory Maps of (Intermediate) Results

Major memory problems with NumPy ndarray objects generally arise due to temporary arrays needed to store intermediate results. We therefore generate memmap objects to store intermediate and final results.

First, for the transpose of the array.

In [18]:
tr = np.memmap(path + 'tr.npy', dtype=np.float64, mode='w+', shape=(n, m))
  # memmap object for transpose
In [19]:
%time tr[:] = od.T[:]
  # write transpose to disk
CPU times: user 14.6 s, sys: 31.7 s, total: 46.3 s
Wall time: 2min 59s

Second, for the final results.

In [20]:
re = np.memmap(path + 're.npy', dtype=np.float64, mode='w+', shape=(m, m))
  # memmap object for result
In [21]:
%time re[:] = od.dot(tr)[:]
  # store results on disk
CPU times: user 8min 2s, sys: 59.1 s, total: 9min 1s
Wall time: 4min 6s

Final Look and Cleaning Up

4+ GB of data (od + tr + re) crunched on a notebook with 4 GB of RAM.

In [23]:
ll ./data/
total 8542992
-rw-r--r--  1 yhilpisch  staff  1458000080 27 Mai 12:19 od.npy
-rw-r--r--  1 yhilpisch  staff  1458000000 27 Mai 12:26 re.npy
-rw-r--r--  1 yhilpisch  staff  1458000000 27 Mai 12:23 tr.npy

In [24]:
!rm ./data/*

Using a Sub-Process

The futures module allows the use of a separate sub-process for callables.

In [25]:
import futures
In [26]:
# the callable
def generate_array_on_disk(m, n):
    a = np.random.standard_normal((m, n))
      # memory inefficient operation
    np.save(path + 'od.npy', a)

The use of such a sub-process makes sure that any memory used by the sub-process get immediately freed after the sub-process is terminated. This leaves the free memory of the current process mainly unchanged. Avoids "unpredictable" behaviour of Python garbage collection.

In [27]:
psutil.phymem_usage()
Out[27]:
usage(total=4294967296L, used=1613758464L, free=1495994368L, percent=55.1)
In [28]:
%%time
with futures.ProcessPoolExecutor(max_workers=1) as subprocess:
    subprocess.submit(generate_array_on_disk, m, n).result()
  # separate sub-process is started, the callable is executed
  # the process with all its memory usage is killed
CPU times: user 7.79 ms, sys: 16.6 ms, total: 24.4 ms
Wall time: 14.8 s

In [29]:
psutil.phymem_usage()
  # meanwhile memory was freed again
Out[29]:
usage(total=4294967296L, used=1762353152L, free=1570590720L, percent=43.8)

Final look and clean-up.

In [30]:
ll ./data/
total 2847664
-rw-r--r--  1 yhilpisch  staff  1458000080 27 Mai 12:27 od.npy

In [31]:
!rm ./data/*

Processing (Too) Large CSV Files

We generate a CSV file on disk that is too large to fit into memory. We process this file with the help of pandas and PyTables.

First, some imports.

In [32]:
import os
import numpy as np
import pandas as pd
import datetime as dt

Generating an Example CSV File

Number of rows to be generated for random data set.

In [33]:
N = int(1e7)
N
Out[33]:
10000000

Using both random integers as well as floats.

In [34]:
ran_int = np.random.randint(0, 10000, size=(2, N))
ran_flo = np.random.standard_normal((2, N))

Path to the files and filename for csv file.

In [35]:
path = './data/'
csv_name = path + 'data.csv'

Writing the data row by row.

In [36]:
%%time
with open(csv_name, 'wb') as csv_file:
    header = 'date,int1,int2,flo1,flo2\n'
    csv_file.write(header)
    for _ in xrange(10):
        # 10 times the original data set
        for i in xrange(N):
            row = '%s, %i, %i, %f, %f\n' % \
                    (dt.datetime.now(), ran_int[0, i], ran_int[1, i],
                                    ran_flo[0, i], ran_flo[1, i])
            csv_file.write(row)
        print os.path.getsize(csv_name)
597774336
1195552768
1793331200
2391109632
2988888064
3586662400
4184440832
4782219264
5379997696
5977772032
CPU times: user 11min 43s, sys: 14.5 s, total: 11min 57s
Wall time: 12min 1s

Delete the original NumPy ndarray objects.

In [37]:
del ran_int
del ran_flo

Reading some rows to check the content.

In [38]:
with open(csv_name, 'rb') as csv_file:
    for _ in xrange(5):
        print csv_file.readline(),
date,int1,int2,flo1,flo2
2014-05-27 12:37:16.375032, 9860, 1887, 0.355085, 1.214623
2014-05-27 12:37:16.375077, 4124, 4043, 0.949081, -0.198921
2014-05-27 12:37:16.375095, 8067, 8088, 0.819238, 0.528587
2014-05-27 12:37:16.375110, 6096, 6302, -0.273824, 0.082087

Reading and Writing with pandas

The filename for the pandas HDFStore.

In [39]:
pd_name = path + 'data.h5p'
In [40]:
h5 = pd.HDFStore(pd_name, 'w')

pandas allows to read data from (large) files chunk-wise via a file-iterator.

In [41]:
it = pd.read_csv(csv_name, iterator=True, chunksize=N / 20)

Reading and storing the data chunk-wise (this processes roughly 12 GB of data).

In [42]:
%%time
for i, chunk in enumerate(it):
    h5.append('data', chunk)
    if i % 20 == 0:
        print os.path.getsize(pd_name)
33118654
698528304
1363688670
2028738730
2693894904
3358922431
4024071885
4689231352
5354406128
6019346434
CPU times: user 3min 37s, sys: 48.7 s, total: 4min 26s
Wall time: 4min 40s

The resulting HDF5 file – with 100,000,000 rows and five columns.

In [43]:
h5
Out[43]:
<class 'pandas.io.pytables.HDFStore'>
File path: ./data/data.h5p
/data            frame_table  (typ->appendable,nrows->100000000,ncols->5,indexers->[index])

Disk-Based Analytics with pandas

The disk-based pandas DataFrame mainly behaves like an in-memory object – but these operations are not memory efficient.

In [44]:
# h5['data'].describe()

Data selection and plotting works as with regular pandas DataFrame objects – again not really memory efficient.

In [45]:
# %matplotlib inline
# h5['data']['flo2'][0:N:500].cumsum().plot()
In [46]:
h5.close()

The major reason is that the DataFrame data structure is broken up (e.g. columns) during storage. For analytics it has to be put together in-memory again.

In [47]:
import tables as tb
h5 = tb.open_file('./data/data.h5p', 'r')
h5
Out[47]:
File(filename=./data/data.h5p, title='', mode='r', root_uep='/', filters=Filters(complevel=0, shuffle=False, fletcher32=False, least_significant_digit=None))
/ (RootGroup) ''
/data (Group) ''
/data/table (Table(100000000,)) ''
  description := {
  "index": Int64Col(shape=(), dflt=0, pos=0),
  "values_block_0": Float64Col(shape=(2,), dflt=0.0, pos=1),
  "values_block_1": Int64Col(shape=(2,), dflt=0, pos=2),
  "values_block_2": StringCol(itemsize=26, shape=(1,), dflt='', pos=3)}
  byteorder := 'little'
  chunkshape := (1985,)
  autoindex := True
  colindexes := {
    "index": Index(6, medium, shuffle, zlib(1)).is_csi=False}
In [48]:
h5.close()

Reading with pandas and Writing with PyTables

The PyTables database file.

In [49]:
import tables as tb
In [50]:
tb_name = path + 'data.h5t'
In [51]:
h5 = tb.open_file(tb_name, 'w')

Using a rec array object of NumPy to provide the row description for the PyTables table. To this end, a custom dtype object is needed.

In [52]:
dty = np.dtype([('date', 'S26'), ('int1', '<i8'), ('int2', '<i8'),
                                 ('flo1', '<f8'), ('flo2', '<f8')])
  # change dtype for date from object to string

Adding compression to the mix (less storage, better backups, better data transfer, etc.).

In [53]:
filters = tb.Filters(complevel=5, complib='blosc')

Again reading and writing chunk-wise, this time appending to a PyTables table object (this processes again roughly 12GB of uncompressed data).

In [54]:
it = pd.read_csv(csv_name, iterator=True, chunksize=N / 20)
In [55]:
%%time
tab = h5.create_table('/', 'data', np.array(it.read().to_records(index=False), dty),
                      filters=filters)
  # initialize table object by using first chunk and adjusted dtype
for chunk in it:
    tab.append(chunk.to_records(index=False))
    tab.flush()
CPU times: user 6min 49s, sys: 6min 33s, total: 13min 23s
Wall time: 24min 30s

The resulting table object – 100,000,000 rows and five columns.

In [56]:
h5.get_filesize()
Out[56]:
2139506844L
In [57]:
tab
Out[57]:
/data (Table(100000000,), shuffle, blosc(5)) ''
  description := {
  "date": StringCol(itemsize=26, shape=(), dflt='', pos=0),
  "int1": Int64Col(shape=(), dflt=0, pos=1),
  "int2": Int64Col(shape=(), dflt=0, pos=2),
  "flo1": Float64Col(shape=(), dflt=0.0, pos=3),
  "flo2": Float64Col(shape=(), dflt=0.0, pos=4)}
  byteorder := 'little'
  chunkshape := (9039,)

Out-of-Memory Analytics with PyTables

Data on disk can be used as if it would be both in-memory and uncompressed. De-compression is done at run-time.

In [58]:
tab[N:N + 3]
  # slicing row-wise
Out[58]:
array([('2014-05-27 12:38:30.955685', 9860, 1887, 0.355085, 1.214623),
       ('2014-05-27 12:38:30.955698', 4124, 4043, 0.949081, -0.19892100000000001),
       ('2014-05-27 12:38:30.955705', 8067, 8088, 0.8192379999999999, 0.528587)], 
      dtype=[('date', 'S26'), ('int1', '<i8'), ('int2', '<i8'), ('flo1', '<f8'), ('flo2', '<f8')])
In [59]:
tab[N:N + 3]['date']
  # access selected data points
Out[59]:
array(['2014-05-27 12:38:30.955685', '2014-05-27 12:38:30.955698',
       '2014-05-27 12:38:30.955705'], 
      dtype='|S26')

Counting of rows is easily accomplished (although here not really needed).

In [60]:
%time len(tab[:]['flo1'])
  # length of column (object)
CPU times: user 19.2 s, sys: 7.79 s, total: 27 s
Wall time: 48 s

Out[60]:
100000000

Aggregation operations, like summing up or calculating the mean value, are another application area.

In [61]:
%time tab[:]['flo1'].sum()
  # sum over column
CPU times: user 21.9 s, sys: 30.7 s, total: 52.6 s
Wall time: 1min 45s

Out[61]:
-1200.4380800047452
In [62]:
%time tab[:]['flo1'].mean()
  # mean over column
CPU times: user 22 s, sys: 30.9 s, total: 52.9 s
Wall time: 1min 50s

Out[62]:
-1.2004380800047452e-05

Typical, SQL-like, conditions and queries can be added.

In [63]:
%time sum([row['flo2'] for row in tab.where('(flo1 > 3) & (int2 < 1000)')])
  # sum combined with condition
CPU times: user 19.3 s, sys: 4.31 s, total: 23.6 s
Wall time: 12.9 s

Out[63]:
150.46715999999984
In [64]:
h5.close()

Overview

All operations have been on data sets that do not fit (if uncompressed) into the memory of the machine they haven been implemented on.

In [65]:
ll data/*
-rw-r--r--  1 yhilpisch  staff  5977776071 27 Mai 12:49 data/data.csv
-rw-r--r--  1 yhilpisch  staff  6651285042 27 Mai 12:53 data/data.h5p
-rw-r--r--  2 yhilpisch  staff  2140122377 27 Mai 13:23 data/data.h5t

Using compression of course reduces the size of the PyTables table object relative to the csv and the pandas HDFStore files. This might, in certain circumstances, lead to file sizes that would again fit in memory.

In [66]:
!rm data/*

Conclusions

Python and libraries like NumPy, pandas, PyTables provide useful means and approaches to circumvent the limitations of free memory on a single computer (node, server, etc.).

Key to the performance of such out-of-memory operations are mainly the storage hardware (speed/capacity), the data format used (e.g. HDF5 vs. relational databases) and in some scenarios also the use of performant compression algorithms.

Reading writing speed of SSD hardware is evolving fast:

  • status quo: 512 MB/s reading/writing (e.g. MacBook)
  • available: 1 GB/s reading/writing (e.g. for servers)
  • state-of-the-art: 2 & 1 GB/s reading & writing speed (e.g. from Intel)

http://fastestssd.com: "800GB Intel SSD Announced, Offers 2 GB/s Throughputs"

The Python Quants



The Python Quants GmbH – the company Web site

www.pythonquants.com

Dr. Yves J. Hilpisch – my personal Web site

www.hilpisch.com

Python for Finance – my NEW book (out as Early Release)

Python for Finance (O'Reilly)

Derivatives Analytics with Python – my derivatives analytics book

www.derivatives-analytics-with-python.com

Contact Us

yves@pythonquants.com | @dyjh