Pandas性能优化:进阶篇
在这里介绍一些更高级的pandas优化方法。
1 numpy
我们先来回顾一下上节说过的一个例子
import pandas as pd
import numpy as np
import time
row_number=100000
df = pd.DataFrame({'a': np.random.randn(row_number),
'b': np.random.randn(row_number),
'N': np.random.randint(100, 1000, (row_number)),
'x': np.random.randint(1, 10, (row_number))})
我们要计算a列与b列的乘积
方法1,采用apply
%timeit df.apply( lambda row: row['a']*row['b'],axis=1)
方法2,直接对series做乘法
%timeit df['a']*df['b']
方法3,使用numpy函数
%timeit np.multiply(df['a'].values,df['b'].values)
方法 | 运行时间 | 运行速度 |
---|---|---|
方法1 | 1.45s | 1 |
方法2 | 254µs | 5708 |
方法3 | 41.2 µs | 3536 |
这提示我们,采用一些好的方法可以大幅度提高pandas的运行速度。
cython
我们还继续使用上面的dataframe,现在定义一个函数:
def f(x):
return x * (x - 1)
def integrate_f(a, b, N):
s = 0
dx = (b - a) / N
for i in range(N):
s += f(a + i * dx)
return s * dx
我们要计算每一行integrate_f的值,
方法1,还是apply:
%timeit df.apply(lambda x: integrate_f(x['a'], x['b'], x['N'].astype(int)), axis=1)
这个函数运行时间就较长了:
7.05 s ± 54.3 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
考虑可以用cython重写程序,提高效率。
在使用cython的时候,可能需要安装gcc环境或者mingw(windows)。
方法1,直接加头编译
%load_ext Cython
%%cython
def f_plain(x):
return x * (x - 1)
def integrate_f_plain(a, b, N):
s = 0
dx = (b - a) / N
for i in range(N):
s += f_plain(a + i * dx)
return s * dx
6.46 s ± 41.3 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
看来直接加头,效率提升不大。
方法2,使用c type
%%cython
cdef double f_typed(double x) except? -2:
return x * (x - 1)
cpdef double integrate_f_typed(double a, double b, int N):
cdef int i
cdef double s, dx
s = 0
dx = (b - a) / N
for i in range(N):
s += f_typed(a + i * dx)
return s * dx
345 ms ± 529 µs per loop (mean ± std. dev. of 7 runs, 1 loop each)
可以看到,使用cython的特定编程方法,效率提升较大。
numba
numba是一个动态JIT编译器,在一些数值计算中可以大幅度提高运行速度。
我们学cython,在python程序上直接加numba jit的头。
import numba
@numba.jitdef f_plain(x):
return x * (x - 1)
@numba.jitdef integrate_f_numba(a, b, N):
s = 0
dx = (b - a) / N
for i in range(N):
s += f_plain(a + i * dx)
return s * dx
@numba.jitdef apply_integrate_f_numba(col_a, col_b, col_N):
n = len(col_N)
result = np.empty(n, dtype='float64')
assert len(col_a) == len(col_b) == n
for i in range(n):
result[i] = integrate_f_numba(col_a[i], col_b[i], col_N[i])
return result
def compute_numba(df):
result = apply_integrate_f_numba(df['a'].to_numpy(),
df['b'].to_numpy(),
df['N'].to_numpy())
return pd.Series(result, index=df.index, name='result')
%timeit compute_numba(df)
6.44 ms ± 440 µs per loop (mean ± std. dev. of 7 runs, 1 loop each)
我们看到,使用numba,需要做的代码改动较小,效率提升幅度却很大!
7 进阶 并行化处理
并行化读取数据
在基础篇讲分块读取时,简单提了一下并行化处理,这里详细说下代码。
第一种思路,分块读取,多进程处理。
import pandas as pd
from multiprocessing import Pool
def process(df):
"""
数据处理
"""
pass
# initialise the iterator object
iterator = pd.read_csv('train.csv', chunksize=200000, compression='gzip',
skipinitialspace=True, encoding='utf-8')
# depends on how many cores you want to utilise
max_processors = 4# Reserve 4 cores for our script
pool = Pool(processes=max_processors)
f_list = []
for df in iterator:
# 异步处理每个分块
f = pool.apply_async(process, [df])
f_list.append(f)
if len(f_list) >= max_processors:
for f in f_list:
f.get()
del f_list[:]
第二种思路,把大文件拆分成多份,多进程读取。
利用linux中的split命令,将csv切分成p个文件。
!split -l 200000 -d train.csv train_split
#将文件train.csv按每200000行分割,前缀名为train_split,并设置文件命名为数字
代码部分
from multiprocessing import Pool
import pandas as pd
import os
def read_func(file_path):
df = pd.read_csv(file_path, header=None)
return df
def read_file():
file_list=["train_split%02d"%i for i in range(66)]
p = Pool(4)
res = p.map(read_func, file_list)
p.close()
p.join()
df = pd.concat(res, axis=0, ignore_index=True)
return df
df = read_file()
并行化apply
apply的func如果在用了我们之前说的技术优化了速度之后仍然很慢,或者func遇到网络阻塞,那么我们需要去并行化执行apply。这里提供一种处理思路:
import multiprocessing as mp
import time
def slow_func(s):
time.sleep(1)
return "done"with mp.Pool(mp.cpu_count()) as pool:
df['newcol'] = pool.map(slow_func, df['qid'])
6 进阶 第三方pandas库
由于padans的操作如apply,都是单线程的,直接调用效率不高。我可以使用第三方库进行并行操作。
当然第三方库会带来新的代码不兼容问题。我们有时候会考虑像上一章一样,手写并行化处理。这个权衡需要我们在编程之初就要规划好,避免后期因为bug需要重构。
dask库
pip install dask
类pandas库,可以并行读取、运行。
import pandas as pd
import dask.dataframe as dd
from dask.multiprocessing import getand the syntax isdata = <your_pandas_dataframe>
ddata = dd.from_pandas(data, npartitions=30)
def some_function(x,y,z):
return x+y+z
res = ddata.map_partitions(lambda df: df.apply((lambda row: myfunc(*row)), axis=1))
.compute(get=get)
swifter
pip install swifter
pandas的插件,可以直接在pandas上操作:
import swifter
def some_function(data):
return data * 10
data['out'] = data['in'].swifter.apply(some_function)
Modin库
Modin后端使用dask或者ray,是个支持分布式运行的类pandas库,当然功能异常强大。具体请看官网,这里就不具体介绍了。
https://modin.readthedocs.io/en/latest/using_modin.html
原创文章,作者:flypython,如若转载,请注明出处:http://flypython.com/ml/139.html
您必须登录才能发表评论。