以執行緒安全的方式處理 write() 生成的 SIGPIPE

當為一個命名或未命名的管道或流套接字呼叫 write() 其讀取結束時,會發生兩件事:

Version <= POSIX.1-2001

  1. SIGPIPE 訊號被髮送到名為 write()程序

Version >= POSIX.1-2004

  1. SIGPIPE 訊號被髮送到名為 write()執行緒

  2. write() 返回 EPIPE 錯誤

有幾種方法可以處理 SIGPIPE

  • 對於套接字,可以通過設定特定於平臺的選項來禁用 SIGPIPE,例如 Linux 中的 MSG_NOSIGNAL 和 BSD 中的 SO_NOSIGPIPE(僅適用於 send,但不適用於 write)。這不便攜。

  • 對於 FIFO(命名管道),如果 writer 使用 O_RDWR 而不是 O_WRONLY,則不會生成 SIGPIPE,因此讀取端始終開啟。但是,這也禁用了 EPIPE

  • 我們可以忽略 SIGPIPE 或設定全域性處理程式。這是一個很好的解決方案,但是如果你不控制整個應用程式(例如你正在編寫一個庫),這是不可接受的。

  • 使用最近的 POSIX 版本,我們可以使用 SIGPIPE 傳送到名為 write() 的執行緒並使用同步訊號處理技術處理它的事實。

下面的程式碼演示了 POSIX.1-2004 及更高版本的執行緒安全 SIGPIPE 處理。

它的靈感來自這篇文章

  • 首先,使用 pthread_sigmask()SIGPIPE 新增到當前執行緒的訊號掩碼中。
  • 使用 sigpending() 檢查是否已經有待定的 SIGPIPE
  • 呼叫 write()。如果讀取結束,則 SIGPIPE 將被新增到待處理訊號掩碼中,並且將返回 EPIPE
  • 如果 write() 返回 EPIPE,並且 SIGPIPEwrite() 之前尚未處理,請使用 sigtimedwait() 將其從待處理訊號掩碼中刪除
  • 使用 pthread_sigmask() 恢復原始訊號掩碼。

原始碼:

#include <unistd.h>
#include <time.h>
#include <errno.h>
#include <sys/signal.h>

ssize_t safe_write(int fd, const void* buf, size_t bufsz)
{
    sigset_t sig_block, sig_restore, sig_pending;

    sigemptyset(&sig_block);
    sigaddset(&sig_block, SIGPIPE);

    /* Block SIGPIPE for this thread.
     *
     * This works since kernel sends SIGPIPE to the thread that called write(),
     * not to the whole process.
     */
    if (pthread_sigmask(SIG_BLOCK, &sig_block, &sig_restore) != 0) {
        return -1;
    }

    /* Check if SIGPIPE is already pending.
     */
    int sigpipe_pending = -1;
    if (sigpending(&sig_pending) != -1) {
        sigpipe_pending = sigismember(&sig_pending, SIGPIPE);
    }

    if (sigpipe_pending == -1) {
        pthread_sigmask(SIG_SETMASK, &sig_restore, NULL);
        return -1;
    }

    ssize_t ret;
    while ((ret = write(fd, buf, bufsz)) == -1) {
        if (errno != EINTR)
            break;
    }

    /* Fetch generated SIGPIPE if write() failed with EPIPE.
     *
     * However, if SIGPIPE was already pending before calling write(), it was
     * also generated and blocked by caller, and caller may expect that it can
     * fetch it later. Since signals are not queued, we don't fetch it in this
     * case.
     */
    if (ret == -1 && errno == EPIPE && sigpipe_pending == 0) {
        struct timespec ts;
        ts.tv_sec = 0;
        ts.tv_nsec = 0;

        int sig;
        while ((sig = sigtimedwait(&sig_block, 0, &ts)) == -1) {
            if (errno != EINTR)
                break;
        }
    }

    pthread_sigmask(SIG_SETMASK, &sig_restore, NULL);
    return ret;
}