以线程安全的方式处理 write() 生成的 SIGPIPE

当为一个命名或未命名的管道或流套接字调用 write() 其读取结束时,会发生两件事:

Version <= POSIX.1-2001

  1. SIGPIPE 信号被发送到名为 write()进程

Version >= POSIX.1-2004

  1. SIGPIPE 信号被发送到名为 write()线程

  2. write() 返回 EPIPE 错误

有几种方法可以处理 SIGPIPE

  • 对于套接字,可以通过设置特定于平台的选项来禁用 SIGPIPE,例如 Linux 中的 MSG_NOSIGNAL 和 BSD 中的 SO_NOSIGPIPE(仅适用于 send,但不适用于 write)。这不便携。

  • 对于 FIFO(命名管道),如果 writer 使用 O_RDWR 而不是 O_WRONLY,则不会生成 SIGPIPE,因此读取端始终打开。但是,这也禁用了 EPIPE

  • 我们可以忽略 SIGPIPE 或设置全局处理程序。这是一个很好的解决方案,但是如果你不控制整个应用程序(例如你正在编写一个库),这是不可接受的。

  • 使用最近的 POSIX 版本,我们可以使用 SIGPIPE 发送到名为 write() 的线程并使用同步信号处理技术处理它的事实。

下面的代码演示了 POSIX.1-2004 及更高版本的线程安全 SIGPIPE 处理。

它的灵感来自这篇文章

  • 首先,使用 pthread_sigmask()SIGPIPE 添加到当前线程的信号掩码中。
  • 使用 sigpending() 检查是否已经有待定的 SIGPIPE
  • 调用 write()。如果读取结束,则 SIGPIPE 将被添加到待处理信号掩码中,并且将返回 EPIPE
  • 如果 write() 返回 EPIPE,并且 SIGPIPEwrite() 之前尚未处理,请使用 sigtimedwait() 将其从待处理信号掩码中删除
  • 使用 pthread_sigmask() 恢复原始信号掩码。

源代码:

#include <unistd.h>
#include <time.h>
#include <errno.h>
#include <sys/signal.h>

ssize_t safe_write(int fd, const void* buf, size_t bufsz)
{
    sigset_t sig_block, sig_restore, sig_pending;

    sigemptyset(&sig_block);
    sigaddset(&sig_block, SIGPIPE);

    /* Block SIGPIPE for this thread.
     *
     * This works since kernel sends SIGPIPE to the thread that called write(),
     * not to the whole process.
     */
    if (pthread_sigmask(SIG_BLOCK, &sig_block, &sig_restore) != 0) {
        return -1;
    }

    /* Check if SIGPIPE is already pending.
     */
    int sigpipe_pending = -1;
    if (sigpending(&sig_pending) != -1) {
        sigpipe_pending = sigismember(&sig_pending, SIGPIPE);
    }

    if (sigpipe_pending == -1) {
        pthread_sigmask(SIG_SETMASK, &sig_restore, NULL);
        return -1;
    }

    ssize_t ret;
    while ((ret = write(fd, buf, bufsz)) == -1) {
        if (errno != EINTR)
            break;
    }

    /* Fetch generated SIGPIPE if write() failed with EPIPE.
     *
     * However, if SIGPIPE was already pending before calling write(), it was
     * also generated and blocked by caller, and caller may expect that it can
     * fetch it later. Since signals are not queued, we don't fetch it in this
     * case.
     */
    if (ret == -1 && errno == EPIPE && sigpipe_pending == 0) {
        struct timespec ts;
        ts.tv_sec = 0;
        ts.tv_nsec = 0;

        int sig;
        while ((sig = sigtimedwait(&sig_block, 0, &ts)) == -1) {
            if (errno != EINTR)
                break;
        }
    }

    pthread_sigmask(SIG_SETMASK, &sig_restore, NULL);
    return ret;
}