2015年8月14日 星期五

prunner.py: 連猴子都會用的加速框架

問題

寫 python script 時有時會遇到 I/O bound (例如需發出大量 HTTP request) 或 CPU bound, 這時可用 multiprocessing 加速。但是自己寫 multi-process 有點麻煩:

  • 要留意 API 使用細節
  • 容易發生 race condition 或 dead lock。
  • 程式較複雜時會切成多個階段作業, 有時會閒置部份 processes 而浪費了加速的機會。
  • 辛苦寫好的平行架構不易重覆使用 (我至少重寫三次以上...泣)。

解法

prunner.py 是我寫好的框架, 中心思想是透過 task queue 提供容易重覆使用的架構, 還有協助減少閒置的 process。

以計算 sum( (i+1) * 2 ) 為例, 下面的程式會用很蠢的方式透過 10 個 processes 平行計算出結果:

import prunner

def begin():
    prunner.get_dict()['sum'] = 0
    prunner.post_task(init, range(2000))

def init(numbers):
    for i in numbers:
        prunner.post_task(add_one, i)

def add_one(n):
    prunner.post_task(double, n + 1)

def double(n):
    prunner.post_task(sum_up, n)
    prunner.post_task(sum_up, n)

def sum_up(n):
    with prunner.global_lock():
        prunner.get_dict()['sum'] += n

def end():
    print prunner.get_dict()['sum']

prunner.init(10, False, begin, end)
prunner.start()

每個 function call 會在一個 process 上執行, 藉此增加使用到的 CPU。也可以盡情地用 blocking I/O, 反正有很多 processes 會平行執行程式。

若不習慣 message loop 的寫法, 也可以用較 low level 的 API, 直接繼承 ParallelTaskRunner 然後覆寫 begin(), run(), end():

import prunner

TASK_INIT = 'task_init'
TASK_ADD_ONE = 'task_add_one'
TASK_DOUBLE = 'task_double'
TASK_SUM = 'task_sum'

class MyRunner(prunner.ParallelTaskRunner):
    def begin(self, options):
        self.dict_['sum'] = 0
        self.queue.put(prunner.Task(TASK_INIT, range(2000)))

    def run(self, task):
        if task.label == TASK_INIT:
            for i in task.data:
                self.queue.put(prunner.Task(TASK_ADD_ONE, i))
            return

        if task.label == TASK_ADD_ONE:
            self.queue.put(prunner.Task(TASK_DOUBLE, task.data + 1))
            return

        if task.label == TASK_DOUBLE:
            self.queue.put(prunner.Task(TASK_SUM, task.data))
            self.queue.put(prunner.Task(TASK_SUM, task.data))
            return

        if task.label == TASK_SUM:
            with prunner.ScopeLock(self.lock):
                self.dict_['sum'] += task.data

    def end(self):
        print self.dict_['sum']


runner = MyRunner(10, False, None)
runner.start()

和前面一樣, 同時有多個 processes 執行 run(), 每次呼叫帶有不同的資料。

雜談

1. 因為底層是 multiprocessing, prunner.py 可以處理 CPU bound 和 I/O bound, 直接用 blocking I/O 也沒問題, 方便配合 third-party library。

不過 I/O 需求量極高時, 還是用 gevent 較適當, 但是要找合 gevent 的 third-party library 或用 gevent monkey patch

2. 這個架構中比較需要巧思的部份是判斷程式的中止條件並且提供有效率的實作。中止條件是「所有 process 閒置且 queue 是空的」。

各個 process 有一個獨立的 flag 表示閒置與否, 這樣比用一個共用的計數器有效率。在我的電腦上執行 prunner_example.py 的時候, 獨立 flag 和計數器的時間分別是 2.5s 和 3.6s。

3. 雖然可以用 post_task() 執行 instance method, 不過會隱含 serialize/deserialize instance 以及傳到其它 process 的成本。這是用框架的缺點, 用起來方便, 但不小心會造成不必要的 overhead。我寫了兩個例子比較: prunner_example3.py 用 instance method, pyrunner_example2.py 用 function。執行時間分別是 2.8s 和 2.6s。

4. 試過各種架構後, 覺得 task queue (或稱 message queue) 還是最直覺的運作方式。有想過 producer-consumer 或 MapReduce 的架構, 不過還是 task queue 最易使用。這種單機層級的運算, 用 task queue 應該綽綽有餘。若要跨機器執行, 太過彈性可能會增加實作的複雜度。那時應該用別人作好的成熟工具。

沒有留言:

張貼留言

在 Fedora 下裝 id-utils

Fedora 似乎因為執行檔撞名,而沒有提供 id-utils 的套件 ,但這是使用 gj 的必要套件,只好自己編。從官網抓好 tarball ,解開來編譯 (./configure && make)就是了。 但編譯後會遇到錯誤: ./stdio.h:10...