server push by long polling

程式碼

沒興趣讀落落長心得的人, 這裡有用 Tornado 實作的程式測試碼

心得

server push 是指從 server 主動送訊息給 client, 這裡有圖解一些達成 server push 的方式。其中我個人偏好 long polling 的作法,運作方式就和字面一樣: client 先發出一個連線, server 不要立即回應。等 server 需要主動通知 client 時, 再回傳資料。

這個作法的好處是:

  • 各種 client 都適用, 只要能用 http 連線即可, 不是瀏覽器也OK, 現在各個平台都有好用的 http 函式庫。
  • 不用擔心 client 網路環境問題。client 能主動建立往 server 的連線, 反過來就很難說了。

另一方面, long polling 的缺點是 server 會有許多閒置的連線占資源。像 apache 這類每個 client 用獨立的 thread 處理連線的作法, 就不適合處理 long polling。假設一個 thread 占據 10MB 空間, 1k 個不作事的連線, 就占掉 10G 記憶體了。另外大量 thread 之間的 context switch 也是可觀的時間負擔。

但是改用 epoll 之類的 API 寫成 event-driven 的架構, 可用 single process single thread 的方式同時處理大量連線, 就沒有浪費記憶體和 context switch 的時間負擔。現在有不少現成的工具使用 epoll 包成 green thread, 使用上和 multi-thread 一樣容易上手 (而且還不用擔心 race condition, 因為實際上只有一個 native thread),降低實作門檻。其中 Tornado 是 Python 寫成的 web framework 並內建可上線用的 web server。我作了簡單的 benchmark, 的確可以快速反應一秒內同時擁入的一千個連線。用 Tornado 省去不少工夫 (若不想用 web server 而是自己從頭作 server, 也可用 gevent)。

實作 long polling 的另一個問題是: 如何在察覺資料更新時, 能立即通知 client? 想像 client 透過連線 S 連往 server, server 先不作回應。接著 server 在其它地方取得新資料, 再透過 S 回傳資料。處理連線 S 的程式要怎麼暫停? 之後要怎麼返回執行? 這個作法可以 scale 嗎?

對於 single process 程式來說, 直覺且低成本的作法是: 在 thread A 讀資料, 發覺沒資料時改用 condition 變數 (搭配 lock) 停住程式, 然後在 thread B 收到新資料時, 用同一個 condition 變數通知 thread A 可以繼續讀資料。對應到 Tornnado 的寫法是用 locks.Condition:

condition = locks.Condition()

@gen.coroutine
def waiter():
    print("I'll wait right here")
    yield condition.wait()  # Yield a Future.
    print("I'm done waiting")

@gen.coroutine
def notifier():
    print("About to notify")
    condition.notify()
    print("Done notifying")

@gen.coroutine
def runner():
    # Yield two Futures; wait for waiter() and notifier() to finish.
    yield [waiter(), notifier()]

io_loop.run_sync(runner)

有了 locks.Condition, 要實作一個完整可上線使用的 long polling 只是小事一椿, 下面的程式碼實作兩個 API /get 和 /set: 提供基本的 "get value by key" 和 "set value for key"。/get 沒資料時會先停住不回應, 等到 /set 取得資料後再立即返回。這只是示意用程式, 沒防錯也沒控管記憶體的用量:

class Entry(object):
   ...

g_cache = {}

class GetHandler(tornado.web.RequestHandler):
    @tornado.gen.coroutine
    def post(self):
        global g_cache

        key = self.get_argument("key", None)
        entry = g_cache.get(key, None)
        if not entry:
            entry = g_cache[key] = Entry()

        value = entry.get_value()
        if value is not None:
            self.write(json.dumps({
                'error_code': ERROR_OK,
                'value': value,
            }))
            return

        # Wait the data to be ready.
        timeout = self.get_argument("timeout", DEFAULT_TIMEOUT)
        now = tornado.ioloop.IOLoop.current().time()
        timeout = now + float(timeout)
        condition = entry.get_condition()
        try:
            yield condition.wait(timeout=timeout)  # Yield a Future.
        except Exception, e:
            logging.exception('condition timeout? timeout=%s' % str(timeout))
            self.write(json.dumps({'error_code': ERROR_TIMEOUT}))
            return

        value = entry.get_value()
        self.write(json.dumps({
            'error_code': ERROR_OK,
            'value': value,
        }))


class SetHandler(tornado.web.RequestHandler):
    @tornado.gen.coroutine
    def post(self):
        global g_cache

        key = self.get_argument("key", None)
        value = self.get_argument("value", None)
        entry = g_cache.get(key, None)
        if not entry:
            entry = g_cache[key] = Entry()
        entry.set_value(value)
        condition = entry.get_condition()
        condition.notify_all()
        self.write(json.dumps({'error_code': ERROR_OK}))

application = tornado.web.Application([
    (r"/get", GetHandler),
    (r"/set", SetHandler),
])

完整的程式見開頭的連結。

留言

這個網誌中的熱門文章

(C/C++ ) 如何在 Linux 上使用自行編譯的第三方函式庫

熟悉系統工具好處多多

virtualbox 使用 USB 裝置