1. FlyPython首页
  2. 数据科学

Pandas性能优化:进阶篇

Pandas性能优化:进阶篇

在这里介绍一些更高级的pandas优化方法。

1 numpy

我们先来回顾一下上节说过的一个例子

import pandas as pd
import numpy as np
import time
row_number=100000
df = pd.DataFrame({'a': np.random.randn(row_number),
                     'b': np.random.randn(row_number),
                    'N': np.random.randint(100, 1000, (row_number)),
                   'x':  np.random.randint(1, 10, (row_number))})

我们要计算a列与b列的乘积

方法1,采用apply

%timeit df.apply( lambda row: row['a']*row['b'],axis=1)

方法2,直接对series做乘法

%timeit df['a']*df['b']

方法3,使用numpy函数

%timeit  np.multiply(df['a'].values,df['b'].values)
方法运行时间运行速度
方法11.45s1
方法2254µs5708
方法341.2 µs3536

这提示我们,采用一些好的方法可以大幅度提高pandas的运行速度。

cython

我们还继续使用上面的dataframe,现在定义一个函数:

def f(x):
    return x * (x - 1)

def integrate_f(a, b, N):
    s = 0
    dx = (b - a) / N
    for i in range(N):
        s += f(a + i * dx)
    return s * dx

我们要计算每一行integrate_f的值,

方法1,还是apply:

%timeit df.apply(lambda x: integrate_f(x['a'], x['b'], x['N'].astype(int)), axis=1)

这个函数运行时间就较长了:

7.05 s ± 54.3 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

考虑可以用cython重写程序,提高效率。
在使用cython的时候,可能需要安装gcc环境或者mingw(windows)。

方法1,直接加头编译

%load_ext Cython
%%cython
def f_plain(x):
    return x * (x - 1)

def integrate_f_plain(a, b, N):
    s = 0
    dx = (b - a) / N
    for i in range(N):
        s += f_plain(a + i * dx)
    return s * dx

6.46 s ± 41.3 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

看来直接加头,效率提升不大。

方法2,使用c type

%%cython
cdef double f_typed(double x) except? -2:
     return x * (x - 1)
cpdef double integrate_f_typed(double a, double b, int N):
    cdef int i
    cdef double s, dx
    s = 0
    dx = (b - a) / N
    for i in range(N):
        s += f_typed(a + i * dx)
    return s * dx

345 ms ± 529 µs per loop (mean ± std. dev. of 7 runs, 1 loop each)

可以看到,使用cython的特定编程方法,效率提升较大。

numba

numba是一个动态JIT编译器,在一些数值计算中可以大幅度提高运行速度。
我们学cython,在python程序上直接加numba jit的头。

import numba

@numba.jitdef f_plain(x):
    return x * (x - 1)

@numba.jitdef integrate_f_numba(a, b, N):
    s = 0
    dx = (b - a) / N
    for i in range(N):
        s += f_plain(a + i * dx)
    return s * dx

@numba.jitdef apply_integrate_f_numba(col_a, col_b, col_N):
    n = len(col_N)
    result = np.empty(n, dtype='float64')
    assert len(col_a) == len(col_b) == n
    for i in range(n):
        result[i] = integrate_f_numba(col_a[i], col_b[i], col_N[i])
    return result

def compute_numba(df):
    result = apply_integrate_f_numba(df['a'].to_numpy(),
                                     df['b'].to_numpy(),
                                     df['N'].to_numpy())
    return pd.Series(result, index=df.index, name='result')

 %timeit compute_numba(df)

6.44 ms ± 440 µs per loop (mean ± std. dev. of 7 runs, 1 loop each)

我们看到,使用numba,需要做的代码改动较小,效率提升幅度却很大!

7 进阶 并行化处理

并行化读取数据

在基础篇讲分块读取时,简单提了一下并行化处理,这里详细说下代码。

第一种思路,分块读取,多进程处理。

import pandas as pd
from multiprocessing import Pool
 
def process(df):
    """
  数据处理
    """
    pass
 
# initialise the iterator object
iterator = pd.read_csv('train.csv', chunksize=200000, compression='gzip',
skipinitialspace=True, encoding='utf-8')
# depends on how many cores you want to utilise
max_processors = 4# Reserve 4 cores for our script
pool = Pool(processes=max_processors)
f_list = []
for df in iterator:
    # 异步处理每个分块
    f = pool.apply_async(process, [df])
    f_list.append(f)
    if len(f_list) >= max_processors:
        for f in f_list:
            f.get()
            del f_list[:]

第二种思路,把大文件拆分成多份,多进程读取。

利用linux中的split命令,将csv切分成p个文件。

!split -l 200000 -d train.csv train_split
#将文件train.csv按每200000行分割,前缀名为train_split,并设置文件命名为数字

代码部分

from multiprocessing import Pool
import pandas as pd
import os
 
def read_func(file_path):
    df = pd.read_csv(file_path, header=None)
    return df
 
def read_file():
    file_list=["train_split%02d"%i for i in range(66)]
    p = Pool(4)
    res = p.map(read_func, file_list)
    p.close()
    p.join()
    df = pd.concat(res, axis=0, ignore_index=True)
    return df
 
df = read_file()

并行化apply

apply的func如果在用了我们之前说的技术优化了速度之后仍然很慢,或者func遇到网络阻塞,那么我们需要去并行化执行apply。这里提供一种处理思路:

import multiprocessing as mp
import time
def slow_func(s):
    time.sleep(1)
    return "done"with mp.Pool(mp.cpu_count()) as pool:
    df['newcol'] = pool.map(slow_func, df['qid'])

6 进阶 第三方pandas库

由于padans的操作如apply,都是单线程的,直接调用效率不高。我可以使用第三方库进行并行操作。
当然第三方库会带来新的代码不兼容问题。我们有时候会考虑像上一章一样,手写并行化处理。这个权衡需要我们在编程之初就要规划好,避免后期因为bug需要重构。

dask库

pip install dask

类pandas库,可以并行读取、运行。

import pandas as pd
import dask.dataframe as dd
from dask.multiprocessing import getand the syntax isdata = <your_pandas_dataframe>
ddata = dd.from_pandas(data, npartitions=30)

def some_function(x,y,z):
    return x+y+z

res = ddata.map_partitions(lambda df: df.apply((lambda row: myfunc(*row)), axis=1))
.compute(get=get)  

swifter

pip install swifter

pandas的插件,可以直接在pandas上操作:

import swifter

def some_function(data):
    return data * 10

data['out'] = data['in'].swifter.apply(some_function)

Modin库

Modin后端使用dask或者ray,是个支持分布式运行的类pandas库,当然功能异常强大。具体请看官网,这里就不具体介绍了。

https://modin.readthedocs.io/en/latest/using_modin.html

原创文章,作者:flypython,如若转载,请注明出处:http://flypython.com/ml/139.html