Python - Pipe 在 Signal 發生時的處理事項
當我在 Python 中結合 multiprocessing.Pipe、multiprocessing.Process 與 signal 模組進行傳統的多工行程設計時,我注意到被擱置的管道資料讀取方法(read),在沒有取得資料的情形下就返回。使得父、子行程之間的互動過程不如預期。錯誤訊息顯示,讀取方法被系統中斷了(Interrupted system call)。read, signal, interrupted system call 這三個關鍵資訊,讓我直覺聯想到我碰到 EINTR 這個狀況了。當我在錯誤處理的流程中增加 errno 的判斷動作之後,我確認這就是 EINTR 狀況。
一位使用過 C 語言撰寫多工行程程式的程序員,對 EINTR 這件事絕不陌生。只是我未料想到會在 Python 之中再度碰到這位老朋友。對於這位老朋友,我們的招待原則請看《多工作業下的資料讀寫處理事項 - read()/write() 被 signal 中斷的處理 》。那篇處理事項中,提到 C 語言要準備兩招應付 EINTR 狀況。不過使用 Python 時則只需要第一招:如果錯誤是因為被 signal 中斷的話,就再讀一次,如果是其他原因導致的錯誤,則視為致命錯誤,應該中止程式繼續
。
典型的擱置式 I/O 處理模式
首先,我們先來看看典型的擱置式 I/O 處理模式。請看 blocking_io_without_signal.py。
import os,sys,multiprocessing,signal,time
def sub_process(pipe):
while True:
rc = pipe.recv()
print("sub process recv: %s" % rc)
if rc == "END":
return
if __name__ == "__main__":
rpipe, wpipe = multiprocessing.Pipe(False)
sub_process = multiprocessing.Process(target=sub_process, args=(rpipe,))
sub_process.start()
time.sleep(5)
wpipe.send("fool")
time.sleep(5)
wpipe.send("END")
sub_process.join()
這個範例程式,規劃了一對管道端子,分別交給父行程與子行程持有。父行程寫入資料,子行程等待讀取資料。當子行程讀到一行 "END" 之後,則結束子行程。
$ python blocking_io_without_signal.py
sub process recv: fool
sub process recv: END
程式執行5秒後輸出第一行 fool,再過5秒後輸出第二行 END。兩行程結束。
接著,我要往這個典型的處理程式中,加入訊號。最常見的狀況,就是應用 SIGALRM 實現計時器的場合。在這個場合中,等待管道輸入資料的擱置中讀取行為,將會因此被訊號 SIGALRM 提前打斷。加入 SIGALRM 計時器的範例程式: unsafe_blocking_io_with_signal.py。
import os,sys,multiprocessing,signal,time
def sub_process(pipe):
def alarm_timer(signum, func):
print("alarm(%d)" % signum)
signal.signal(signal.SIGALRM, alarm_timer)
signal.setitimer(signal.ITIMER_REAL, 1, 1)
#signal.siginterrupt(signal.SIGALRM, False)
# this does not work perfectly.
while True:
rc = pipe.recv()
print("sub process recv: %s" % rc)
if rc == "END":
return
if __name__ == "__main__":
rpipe, wpipe = multiprocessing.Pipe(False)
# this will be interrupted by any signal.
# If you are an expert Unix/POSIX programmer, you will invoke sigaction()
# with SA_RESTART to avoid this behaviour. That is modern way.
# But Python does not provide this way, therefore you need to use
# traditional way.
# See also:
# http://bugs.python.org/msg102725
sub_process = multiprocessing.Process(target=sub_process, args=(rpipe,))
sub_process.start()
time.sleep(5)
wpipe.send("fool")
time.sleep(5)
wpipe.send("END")
sub_process.join()
$ python unsafe_blocking_io_with_signal.py
alarm(14)
Process Process-1:
Traceback (most recent call last):
File "/usr/lib/python2.6/multiprocessing/process.py", line 232, in _bootstrap
self.run()
File "/usr/lib/python2.6/multiprocessing/process.py", line 88, in run
self._target(*self._args, **self._kwargs)
File "unsafe_blocking_io_with_signal.py", line 14, in sub_process
rc = pipe.recv()
IOError: [Errno 4] Interrupted system call
執行後,你將發現子行程未如預期般地於5秒後讀取資料。事實上,它在執行一秒後,就被系統打斷了。原因就是系統觸發了訊號 SIGALRM。
在我的試驗過程中,我曾嘗試使用 signal.siginterrupt() 抑制 SIGALRM 打斷擱置中的 I/O 行為之事。可惜效果不如預期,它仍然被打斷了。
實作較安全的 Pipe 替代品
Python 的 multiprocessing.Pipe 是一個工廠方法,它會生產一對管道讀寫端子。這對管道讀寫端子是 multiprocessing.Connection 的實體。我嘗試過幾種重構寫法後,採用的寫法是實作一個包覆 multiprocessing.Connection 的新類別 SignalSafeConnection 與一個替代的工廠方法 - SignalSafePipe。
SignalSafeConnection 將我需要的 I/O 處理方法,包裹在如果錯誤是因為被 signal 中斷的話,就再讀一次,如果是其他原因導致的錯誤,則視為致命錯誤,應該中止程式繼續
的處理原則中。SignalSafePipe 則會生產一對 SignalSafeConnection 類的管道讀寫端子;我用它替代 multiprocessing.Pipe 方法。
完整的實作範例,為 safe_blocking_io_with_signal.py。
import os,sys,multiprocessing,signal,time
class SignalSafeConnection(object):
def __init__(self, handle):
self.handle = handle
def send(self, obj):
# this is a traditional skill of unix programming,
# but be careful to use it.
while True:
try:
rc = self.handle.send(obj)
except (OSError, IOError) as e:
if e.errno == 4:
print("SIGIO, Interrupted system call, restart")
continue
else:
# unexcepted error, re-raise
raise
else:
break
return rc
def recv(self):
while True:
try:
rc = self.handle.recv()
except (OSError, IOError) as e:
if e.errno == 4:
print("SIGIO, Interrupted system call, restart")
continue
else:
# unexcepted error, re-raise
raise
else:
break
return rc
def fileno(self):
return self.handle.fileno()
def close(self):
self.handle.close()
self.handle = None
def poll(self, timeout=0):
end_time = time.time() + timeout
while True:
try:
rc = self.handle.poll(timeout)
except (OSError, IOError) as e:
if e.errno == 4:
current_time = time.time()
if current_time > end_time:
rc = False
break
print("SIGIO, Interrupted system call, restart")
timeout = end_time - current_time
continue
else:
# unexcepted error, re-raise
raise
else:
break
return rc
def send_bytes(self, buffer):
while True:
try:
rc = self.handle.send_bytes(buffer)
except (OSError, IOError) as e:
if e.errno == 4:
print("SIGIO, Interrupted system call, restart")
continue
else:
# unexcepted error, re-raise
raise
else:
break
return rc
def recv_bytes(self, maxlength):
while True:
try:
rc = self.handle.send_bytes(buffer)
except (OSError, IOError) as e:
if e.errno == 4:
print("SIGIO, Interrupted system call, restart")
continue
else:
# unexcepted error, re-raise
raise
else:
break
return rc
def SignalSafePipe(duplex):
pipe1, pipe2 = multiprocessing.Pipe(duplex)
sspipe1 = SignalSafeConnection(pipe1)
sspipe2 = SignalSafeConnection(pipe2)
return sspipe1, sspipe2
def sub_process(pipe):
def alarm_timer(signum, func):
print("alarm(%d)" % signum)
signal.signal(signal.SIGALRM, alarm_timer)
signal.setitimer(signal.ITIMER_REAL, 1, 1)
#signal.siginterrupt(signal.SIGALRM, False)
# this does not work perfectly.
while True:
rc = pipe.recv()
print("sub process recv: %s" % rc)
if rc == "END":
return
if __name__ == "__main__":
#rpipe, wpipe = multiprocessing.Pipe(False)
rpipe, wpipe = SignalSafePipe(False)
sub_process = multiprocessing.Process(target=sub_process, args=(rpipe,))
sub_process.start()
time.sleep(5)
wpipe.send("fool")
time.sleep(5)
wpipe.send("END")
sub_process.join()
$ python safe_blocking_io_with_signal.py
alarm(14)
SIGIO, Interrupted system call, restart
... (省略)
sub process recv: fool
alarm(14)
SIGIO, Interrupted system call, restart
... (省略)
sub process recv: END
程式執行5秒後輸出第一行 fool,再過5秒後輸出第二行 END。在等待的同時,計時器也會每秒觸發一次,這讓我們可以在等待的過程中,抽空做別的事情。這就是 process, pipe, signal 三者合作下進行多工作業程式設計的傳統技藝。
基於線程的並行模式
前幾節提到的基於訊號(signal)的並行模式,是傳統的設計模式。隨著現代作業系統逐漸實現線程(thread, 或稱執行緒)功能後,現代的並行設計模式主要是以線程來實現。
例如 unsafe_blocking_io_with_signal.py 的內容,在沒有意外的情況下,完全可以使用 threading.Timer 取代 signal.setitimer。如下列所示:
import os,sys,multiprocessing,signal,time,threading
def sub_process(pipe):
def alarm_timer(interval):
while True:
print("sub process alarm")
time.sleep(interval)
#signal.signal(signal.SIGALRM, alarm_timer)
#signal.setitimer(signal.ITIMER_REAL, 1, 1)
t = threading.Timer(1, alarm_timer, args=(1,))
t.start()
while True:
rc = pipe.recv()
print("sub process recv: %s" % rc)
if rc == "END":
return
if __name__ == "__main__":
rpipe, wpipe = multiprocessing.Pipe(False)
sub_process = multiprocessing.Process(target=sub_process, args=(rpipe,))
sub_process.start()
time.sleep(5)
wpipe.send("fool")
time.sleep(5)
wpipe.send("END")
sub_process.join()
如果你的程式中不需要利用訊號處理並行作業,那麼 Python 可以保證線程在運行時,不會干擾到 Pipe 的擱置行為。你便不需要考慮 EINTR 的狀況。
後記
GIL?
上一節中,我強調在沒有意外的情況下可以用線程取代訊號實現並行作業,這是因為我很不幸地碰到了意外情況。當我使用 Python 設計一個稍微複雜的服務程式時,我在主行程上配置的 threading.Timer 沒有運作。儘管 active_thread() 與 Timer.is_alive() 都顯示這個計時器還活著,但它就是沒有執行。直到子行程結束時,主行程的計時器才突然醒過來。我懷疑這是 Global Interpreter Lock 在搞鬼。最後我祭出了傳家法寶,用訊號實現我的需求。也因此與 EINTR 再會,並產生了這篇文章。
重構寫法
我原本嘗試用 Ruby 式 open class 寫法,直接修改 _multiprocessing.Connection 這個類別。然而 Python 向我抱怨這是一個原生的內容,不允許修改它的行為。即使我參考了《Opening python classes》的寫法也不行。Python 告訴我參考文章中使用的某些方法並不存在。我想或許是我用的 Python 版本比較舊吧。我用的是 Python 2.6。
預防行程無盡等待
當我們把 I/O 行為包入一個不被訊號所終止的迴圈時,我們要小心處理這個迴圈。因為我們缺乏資訊得知打斷擱置狀態的訊號是哪一個。這表示連使用者主動發出的訊號,也可能被忽視。例如你可能希望用 SIGALRM 增加一個逾時終止的功能,讓 SIGALRM 主動打斷一個等候太久的資料讀取行為。這時,我們的處理方式就會忽視這個訊號,導致行程陷入無盡地等待迴圈中。
這個狀況有解決方案,就是用 poll()。我們通常配合 poll() 使用 read(),可以指定最長擱置時間,以免行程陷入無盡地等待迴圈。
樂多舊回應