Pythonでクラス内の関数を並列演算させる

オブジェクト指向プログラミング+multiprocessing.Poolでの並列処理でハマりました。
クラスの中でPoolライクな文法で並列演算を行なうことのできるMyPoolを自作したのでその紹介です。

Pythonでの並列処理について

Pythonにはver2.6以降からmultiprocessingという便利なものがあります。
これのPoolという機能を使うと、

test.py

# -*- coding: utf-8 -*-
from multiprocessing import Pool

def fuga(x): # 並列実行したい関数
    return x*x

def hoge():
    p = Pool(8) # 8スレッドで実行
    print p.map(fuga, range(10)) # fugaに0,1,..のそれぞれを与えて並列演算

if __name__ == "__main__":
    hoge()

出力結果

$ python test.py
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

このようにたったこれだけで並列処理ができてしまいます。
とっても便利。
fugaという関数に、range(10)つまり0〜9までのそれぞれの値を入れたときの結果を8スレッドの並列演算をして求めているわけです。
関数にいろいろなデータを処理させたいときにそのデータをリストにして、処理結果をまたリストでもらってくるという方法はとても直感的です。

ただしこのPool、実は以下のコードでは動いてくれません。
test.py

from multiprocessing import Pool

def hoge():
    def fuga(x):
        return x*x
    p = Pool(8)
    print p.map(fuga, range(10))

if __name__ == "__main__":
    hoge()

出力結果

Exception in thread Thread-1:
Traceback (most recent call last):
  File "/usr/lib64/python2.6/threading.py", line 525, in __bootstrap_inner
    self.run()
  File "/usr/lib64/python2.6/threading.py", line 477, in run
    self.__target(*self.__args, **self.__kwargs)
  File "/usr/lib64/python2.6/multiprocessing/pool.py", line 225, in _handle_tasks
    put(task)
PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed

どうやら並列演算を行なう関数がローカル関数であると動かないようです。
(公式なルートで確認したわけではありません)

ついでに以下も同様のエラーを出して動きません。
test.py

from multiprocessing import Pool

class Test:
    def fuga(self, x):
        return x*x

    def hoge(self):
        p = Pool(8)
        print p.map(self.fuga, range(10))

if __name__ == "__main__":
    test = Test()
    test.hoge()

同じように並列演算を行なう関数がクラスのメンバ関数だと動かないようです。

ローカル関数が動かないのはあまり困りませんが、オブジェクト指向でプログラミングしているとクラスのメンバ関数が動いてくれないのは非常に困ります。
そこでクラスのメンバ関数でも並列演算できるようなPoolクラスを自作してみました。

mypool.pyのコード

mypool.py

# -*- coding: utf-8 -*-

from multiprocessing import Process, Pipe

"""
マルチスレッドで関数を実行するためのクラスです。
クラスの中から使えます。
"""
class MyPool:
    proc_num = 8

    def __init__(self, proc_num):
        self.proc_num = proc_num

    """
    指定した関数funcにargsの引数を一つ一つ与え実行します。
    これらはあらかじめ指定された数のプロセスで並列実行されます。
    """
    def map(self, func, args):
        def pipefunc(conn,arg):
            conn.send(func(arg))
            conn.close()
        ret = []
        k = 0
        while(k < len(args)):
            plist = []
            clist = []
            end = min(k + self.proc_num, len(args))
            for arg in args[k:end]:
                pconn, cconn = Pipe()
                plist.append(Process(target = pipefunc, args=(cconn,arg,)))
                clist.append(pconn)
            for p in plist:
                p.start()
            for conn in clist:
                ret.append(conn.recv())
            for p in plist:
                p.join()
            k += self.proc_num
        return ret

私が使いたい機能しか実装していないので、メソッドはmapしかありません。でも単に並列処理をするならこれで充分。

MyPoolの使い方

使い方はPoolのときと同様です。

test.py

from mypool import MyPool

class Test:
    def fuga(self, x):
        return x*x

    def hoge(self):
        p = MyPool(8)
        print p.map(self.fuga, range(10))

if __name__ == "__main__":
    test = Test()
    test.hoge()

出力結果

$ python test.py
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

import先を変更して、PoolをMyPoolにかえるだけです。

MyPoolの仕組み

どうやらいろいろ試しているとクラスのメンバ関数で並列演算するのにPoolは使えなくとも同じmultiprocessingのProcessは使えることが分かったので、それを使って並列処理。
返り値の処理はmultiprocessingのPipeを利用。

割り当てられた数のスレッド分だけ指示された関数を走らせ、走っている分がすべて終われば残りの処理をまた並列で行ないます。

実際使ってみて

MyPoolで並列演算しながらtopで各プロセスのCPU占有率などをみていましたが8スレッド指定をしてもどうやらいつも8スレッドフルパワーで動いてくれているわけではないようです。なんでかなぁ。
処理が速くなったことは確かですが。

MyPoolの使用に関して

上のmypool.pyはご自由に使用・改変していただいて構いません。ただしその動作や安全性に関して私は一切の保証をしません。

参考リンク

6.6. multiprocessing — Process-based “threading” interface
http://docs.python.org/library/multiprocessing.html

Multiprocessing の使いどころ。 - Twisted Mind
http://d.hatena.ne.jp/Voluntas/20081115/1226779786

Python 3.0 Hacks 第5回 multiprocessingモジュールによるプロセス間通信 - gihyo.jp
http://gihyo.jp/dev/serial/01/pythonhacks/0005