跳到主要內容

prunner.py: 連猴子都會用的加速框架

問題

寫 python script 時有時會遇到 I/O bound (例如需發出大量 HTTP request) 或 CPU bound, 這時可用 multiprocessing 加速。但是自己寫 multi-process 有點麻煩:

  • 要留意 API 使用細節
  • 容易發生 race condition 或 dead lock。
  • 程式較複雜時會切成多個階段作業, 有時會閒置部份 processes 而浪費了加速的機會。
  • 辛苦寫好的平行架構不易重覆使用 (我至少重寫三次以上...泣)。

解法

prunner.py 是我寫好的框架, 中心思想是透過 task queue 提供容易重覆使用的架構, 還有協助減少閒置的 process。

以計算 sum( (i+1) * 2 ) 為例, 下面的程式會用很蠢的方式透過 10 個 processes 平行計算出結果:

import prunner

def begin():
    prunner.get_dict()['sum'] = 0
    prunner.post_task(init, range(2000))

def init(numbers):
    for i in numbers:
        prunner.post_task(add_one, i)

def add_one(n):
    prunner.post_task(double, n + 1)

def double(n):
    prunner.post_task(sum_up, n)
    prunner.post_task(sum_up, n)

def sum_up(n):
    with prunner.global_lock():
        prunner.get_dict()['sum'] += n

def end():
    print prunner.get_dict()['sum']

prunner.init(10, False, begin, end)
prunner.start()

每個 function call 會在一個 process 上執行, 藉此增加使用到的 CPU。也可以盡情地用 blocking I/O, 反正有很多 processes 會平行執行程式。

若不習慣 message loop 的寫法, 也可以用較 low level 的 API, 直接繼承 ParallelTaskRunner 然後覆寫 begin(), run(), end():

import prunner

TASK_INIT = 'task_init'
TASK_ADD_ONE = 'task_add_one'
TASK_DOUBLE = 'task_double'
TASK_SUM = 'task_sum'

class MyRunner(prunner.ParallelTaskRunner):
    def begin(self, options):
        self.dict_['sum'] = 0
        self.queue.put(prunner.Task(TASK_INIT, range(2000)))

    def run(self, task):
        if task.label == TASK_INIT:
            for i in task.data:
                self.queue.put(prunner.Task(TASK_ADD_ONE, i))
            return

        if task.label == TASK_ADD_ONE:
            self.queue.put(prunner.Task(TASK_DOUBLE, task.data + 1))
            return

        if task.label == TASK_DOUBLE:
            self.queue.put(prunner.Task(TASK_SUM, task.data))
            self.queue.put(prunner.Task(TASK_SUM, task.data))
            return

        if task.label == TASK_SUM:
            with prunner.ScopeLock(self.lock):
                self.dict_['sum'] += task.data

    def end(self):
        print self.dict_['sum']


runner = MyRunner(10, False, None)
runner.start()

和前面一樣, 同時有多個 processes 執行 run(), 每次呼叫帶有不同的資料。

雜談

1. 因為底層是 multiprocessing, prunner.py 可以處理 CPU bound 和 I/O bound, 直接用 blocking I/O 也沒問題, 方便配合 third-party library。

不過 I/O 需求量極高時, 還是用 gevent 較適當, 但是要找合 gevent 的 third-party library 或用 gevent monkey patch

2. 這個架構中比較需要巧思的部份是判斷程式的中止條件並且提供有效率的實作。中止條件是「所有 process 閒置且 queue 是空的」。

各個 process 有一個獨立的 flag 表示閒置與否, 這樣比用一個共用的計數器有效率。在我的電腦上執行 prunner_example.py 的時候, 獨立 flag 和計數器的時間分別是 2.5s 和 3.6s。

3. 雖然可以用 post_task() 執行 instance method, 不過會隱含 serialize/deserialize instance 以及傳到其它 process 的成本。這是用框架的缺點, 用起來方便, 但不小心會造成不必要的 overhead。我寫了兩個例子比較: prunner_example3.py 用 instance method, pyrunner_example2.py 用 function。執行時間分別是 2.8s 和 2.6s。

4. 試過各種架構後, 覺得 task queue (或稱 message queue) 還是最直覺的運作方式。有想過 producer-consumer 或 MapReduce 的架構, 不過還是 task queue 最易使用。這種單機層級的運算, 用 task queue 應該綽綽有餘。若要跨機器執行, 太過彈性可能會增加實作的複雜度。那時應該用別人作好的成熟工具。

留言

這個網誌中的熱門文章

(C/C++ ) 如何在 Linux 上使用自行編譯的第三方函式庫

以使用 LevelDB 為例。 抓好並編好相關檔案,編譯方式見第三方函式庫附的說明:$ ls include/ # header files leveldb/ $ ls out-shared/libleveldb.so* # shared library out-shared/libleveldb.so@ out-shared/libleveldb.so.1@ out-shared/libleveldb.so.1.20* 下面的例子用 clang++ 編譯,這裡用到的參數和 g++ 一樣。 問題一:找不到 header$ clang++ sample.cpp sample.cpp:5:10: fatal error: 'leveldb/db.h' file not found #include "leveldb/db.h" ^ 1 error generated. 解法:用 -I 指定 header 位置 問題二:找不到 shared library$ clang++ sample.cpp -I include/ /tmp/sample-2e7dd8.o: In function `main': sample.cpp:(.text+0x1e): undefined reference to `leveldb::Options::Options()' sample.cpp:(.text+0x6f): undefined reference to `leveldb::DB::Open(leveldb::Options const&, std::string const&, leveldb::DB**)' sample.cpp:(.text+0x10c): undefined reference to `leveldb::Status::ToString() const' sample.cpp:(.text+0x7d0): undefined reference to `leveldb::Status::ToString() const' clang: error: linker command failed with exit code 1 (u…

virtualbox 使用 USB 裝置

2012-12-16 更新 現在 (4.x 版) 似乎無需做任何設定, 只要有裝 Oracle VM VirtualBox Extension Pack, 在 VirtualBox 視窗右下角按 USB 的圖示, 再點目標裝置, 即可加入或移除該裝置 同一時間只有 host 或 guest 可擁有該裝置, 所以從 guest OS 移除, 相當於接回 host OS 目前 VirtualBox 只支援 USB 2.0 的插槽, 若偵測不到時, 注意一下是否為這個問題 有時拔拔插插, VirtualBox 會進入奇怪的狀態, 接上去 guest OS 無法連接且跳出 device is busy 的錯誤訊息。試看看拔除該裝置, 重開 guest OS (續上則) 若重開 guest OS 無效, 並且 host OS 已移除該裝置, VirtualBox 的 USB 清單卻仍顯示 "captured", 試看看拔除該裝置, 重開 host OS原文網路上搜一下, 比較多是 Ubuntu 當 host 的解法, 我的情況是 Win7 當 host, Ubuntu 當 guest。 這兩篇說明很詳細《Learn How to Set Up USB and Networking Options in VirtualBox》《幻影千瞳的部落格: VirtualBox 使用筆記(二):使用 USB 裝置》 現在的版本圖形介面很好用了, 不用像第二篇說的那樣用指令操作。這裡記下我的操作步驟: 關掉 guest OS 在 VirtualBox 選單, 選擇 guest OS -> Settings -> USB -> Enable USB 2.0 會出現訊息框, 說明要安裝 Oracle VM VirtualBox Extension Pack。下載後安裝它 host OS 插入 USB 隨身碟 在 VirtualBox 選單, 選擇 guest OS -> Settings -> USB, 點右邊有綠色 "+" 的 USB 頭的圖示, 選擇該 USB 隨身碟, 加入它的 filter 從 host OS 移除 USB 隨身碟 開啟 guest OS 插入 USB 隨身碟, 於是 guest OS 會自動偵測…

解決 undefined symbol / reference

C++ 新手上路, 有錯還請幫忙指正。 基本觀念相較於 script language 或 Java 來說, C/C++ 有完整的「編譯 -> 連結 -> 執行」三個階段, 各階段都可能發生 undefined symbol。在解決惱人的 undefined symbol 前, 得先明白整個編譯流程: 編譯 .c / .cpp 為 .o (object file) 時, 需要提供 header 檔 (用到 gcc 參數 -I)。事實上, 在編譯單一檔案時, gcc/g++ 根本不在意真正的 symbol 是否存在, 反正有宣告它就信了, 所以有引對 header 即可。這也是可分散編譯的原因 (如 distcc ), 程式之間在編譯成 .o 檔時, 並沒有相依性。 用 linker (ld 或 gold) 將 *.o 連結成 dynamic library 或執行檔時, 需要提供要連結的 library (用到 gcc 參數 -L 指定目錄位置, 用 -l 指定要連什麼函式庫)。不同於前一步, 此時 symbol 一定要在。 執行的時候, 會再動態開啟 shared library 讀出 symbol。換句話說, 前一個步驟只是檢查是否有。檢查通過也連結成 executable 或 shared library 後, 若執行時對應的檔案不見了, 仍會在執行期間找不到 symbol。若位置沒設好, 可能需要用 LIB_LIBRARY_PATH 指定動態函式的位置, 但不建議這麼做, 最好在執行 linker 時就指定好位置。原因見《Why LD_LIBRARY_PATH is bad》。明白這點後, 就看 undefined symbol 發生在那個階段, 若是編 object file 時發生, 就是沒和編譯器說 header 檔在那, 記得用 -I 告訴它。若在 linking 時發生, 就要同時設好 -L 和 -l。不過難就難在要去那找 undefined symbol 的出處。 解決問題的流程首先是判斷 symbol 是不是自己用到的原始碼裡, 可配合 id-utils 找看看 (我是用 gj, 比較方便一點)。或是看有沒有 man page, 有 man page 的話, 裡面會記錄用到的 header 和該怎麼下連結參數。若在專案裡找不到, …