Skip to content

ipc::sync::condition

mutouyun edited this page Dec 9, 2025 · 1 revision

进程间条件变量,用于线程/进程间的事件等待和通知。必须与ipc::sync::mutex配合使用。

namespace ipc {
namespace sync {

class condition {
public:
    condition();
    explicit condition(char const *name);
    ~condition();

    void const *native() const noexcept;
    void *native() noexcept;

    bool valid() const noexcept;

    bool open(char const *name) noexcept;
    void close() noexcept;

    void clear() noexcept;
    static void clear_storage(char const * name) noexcept;

    bool wait(ipc::sync::mutex &mtx, std::uint64_t tm = ipc::invalid_value) noexcept;
    bool notify(ipc::sync::mutex &mtx) noexcept;
    bool broadcast(ipc::sync::mutex &mtx) noexcept;
};

} // namespace sync
} // namespace ipc
成员 说明
condition 构造函数
~condition 析构函数
open 打开一个命名的condition
close 关闭condition
clear 清理condition资源
clear_storage 清理指定名称的condition存储
wait 等待条件满足
notify 通知一个等待者
broadcast 通知所有等待者
valid 检查condition是否有效
native 获取原生句柄

构造函数

condition

/*A*/ condition();
/*B*/ explicit condition(char const *name);
  • A. 默认构造函数,创建一个匿名condition(仅用于进程内同步)
  • B. 创建或打开一个命名的condition(用于进程间同步)
参数 说明
name char const *,condition的名称。使用相同名称可以在不同进程间共享condition

~condition

~condition();

析构函数。自动释放condition资源。

资源管理

open

bool open(char const *name) noexcept;

打开一个命名的condition。如果condition不存在,会自动创建。

参数 说明
name char const *,condition的名称
返回值 说明
true 打开成功
false 打开失败

close

void close() noexcept;

关闭condition。释放相关资源。

clear

void clear() noexcept;

清理已打开condition的所有资源,包括共享内存。

clear_storage

static void clear_storage(char const * name) noexcept;

静态方法。清理指定名称的condition存储资源。

参数 说明
name char const *,要清理的condition名称

条件变量操作

wait

bool wait(ipc::sync::mutex &mtx, std::uint64_t tm = ipc::invalid_value) noexcept;

等待条件满足。调用时必须已经持有mutex锁。函数会自动释放mutex并进入等待状态,被唤醒后会重新获得mutex锁。

参数 说明
mtx ipc::sync::mutex &,关联的mutex引用。调用前必须已加锁
tm std::uint64_t,超时时间(毫秒)。默认为ipc::invalid_value(无限等待)
返回值 说明
true 被成功唤醒(收到通知)
false 超时

重要:wait()返回后,无论成功还是超时,mutex都已被重新锁定。

notify

bool notify(ipc::sync::mutex &mtx) noexcept;

唤醒一个等待的线程/进程。

参数 说明
mtx ipc::sync::mutex &,关联的mutex引用
返回值 说明
true 操作成功
false 操作失败

注意:通常在持有mutex锁的情况下调用notify()。

broadcast

bool broadcast(ipc::sync::mutex &mtx) noexcept;

唤醒所有等待的线程/进程。

参数 说明
mtx ipc::sync::mutex &,关联的mutex引用
返回值 说明
true 操作成功
false 操作失败

状态查询

valid

bool valid() const noexcept;

检查condition是否有效(已打开)。

返回值 说明
true condition有效
false condition无效或未打开

native

/*A*/ void const *native() const noexcept;
/*B*/ void *native() noexcept;

获取平台相关的原生condition句柄。

使用示例

生产者-消费者模式

#include "libipc/condition.h"
#include "libipc/mutex.h"
#include "libipc/shm.h"
#include <iostream>
#include <thread>
#include <deque>

constexpr int MAX_QUEUE_SIZE = 10;

struct shared_queue {
    int data[MAX_QUEUE_SIZE];
    int front;
    int rear;
    int count;
};

// 生产者
void producer(int id) {
    ipc::sync::mutex mtx("queue_mutex");
    ipc::sync::condition not_full("not_full");
    ipc::sync::condition not_empty("not_empty");
    
    ipc::shm::handle shm("shared_queue", sizeof(shared_queue));
    auto* queue = static_cast<shared_queue*>(shm.get());
    
    for (int i = 0; i < 20; ++i) {
        int item = id * 1000 + i;
        
        mtx.lock();
        
        // 等待队列不满
        while (queue->count >= MAX_QUEUE_SIZE) {
            std::cout << "Producer " << id << ": queue full, waiting..." << std::endl;
            if (!not_full.wait(mtx, 5000)) {
                std::cout << "Producer " << id << ": timeout!" << std::endl;
                mtx.unlock();
                continue;
            }
        }
        
        // 生产数据
        queue->data[queue->rear] = item;
        queue->rear = (queue->rear + 1) % MAX_QUEUE_SIZE;
        queue->count++;
        std::cout << "Producer " << id << " produced: " << item 
                 << " (count=" << queue->count << ")" << std::endl;
        
        // 通知消费者
        not_empty.notify(mtx);
        mtx.unlock();
        
        std::this_thread::sleep_for(std::chrono::milliseconds(100));
    }
}

// 消费者
void consumer(int id) {
    ipc::sync::mutex mtx("queue_mutex");
    ipc::sync::condition not_full("not_full");
    ipc::sync::condition not_empty("not_empty");
    
    ipc::shm::handle shm("shared_queue", sizeof(shared_queue));
    auto* queue = static_cast<shared_queue*>(shm.get());
    
    for (int i = 0; i < 20; ++i) {
        mtx.lock();
        
        // 等待队列不空
        while (queue->count == 0) {
            std::cout << "Consumer " << id << ": queue empty, waiting..." << std::endl;
            if (!not_empty.wait(mtx, 5000)) {
                std::cout << "Consumer " << id << ": timeout!" << std::endl;
                mtx.unlock();
                continue;
            }
        }
        
        // 消费数据
        int item = queue->data[queue->front];
        queue->front = (queue->front + 1) % MAX_QUEUE_SIZE;
        queue->count--;
        std::cout << "Consumer " << id << " consumed: " << item 
                 << " (count=" << queue->count << ")" << std::endl;
        
        // 通知生产者
        not_full.notify(mtx);
        mtx.unlock();
        
        std::this_thread::sleep_for(std::chrono::milliseconds(150));
    }
}

int main() {
    // 初始化共享队列
    {
        ipc::shm::handle shm("shared_queue", sizeof(shared_queue));
        auto* queue = static_cast<shared_queue*>(shm.get());
        queue->front = 0;
        queue->rear = 0;
        queue->count = 0;
    }
    
    std::thread p1(producer, 1);
    std::thread c1(consumer, 1);
    std::thread c2(consumer, 2);
    
    p1.join();
    c1.join();
    c2.join();
    
    // 清理
    ipc::sync::mutex::clear_storage("queue_mutex");
    ipc::sync::condition::clear_storage("not_full");
    ipc::sync::condition::clear_storage("not_empty");
    
    return 0;
}

事件通知

// 简单的事件通知示例
void waiter(int id) {
    ipc::sync::mutex mtx("event_mutex");
    ipc::sync::condition cond("event_cond");
    ipc::shm::handle shm("event_flag", sizeof(bool));
    auto* event_occurred = static_cast<bool*>(shm.get());
    
    mtx.lock();
    
    // 等待事件发生
    while (!*event_occurred) {
        std::cout << "Waiter " << id << ": waiting for event..." << std::endl;
        cond.wait(mtx);
    }
    
    std::cout << "Waiter " << id << ": event occurred!" << std::endl;
    mtx.unlock();
}

void trigger_event() {
    std::this_thread::sleep_for(std::chrono::seconds(2));
    
    ipc::sync::mutex mtx("event_mutex");
    ipc::sync::condition cond("event_cond");
    ipc::shm::handle shm("event_flag", sizeof(bool));
    auto* event_occurred = static_cast<bool*>(shm.get());
    
    mtx.lock();
    *event_occurred = true;
    std::cout << "Event triggered!" << std::endl;
    cond.broadcast(mtx);  // 唤醒所有等待者
    mtx.unlock();
}

int main() {
    // 初始化事件标志
    {
        ipc::shm::handle shm("event_flag", sizeof(bool));
        auto* flag = static_cast<bool*>(shm.get());
        *flag = false;
    }
    
    std::thread w1(waiter, 1);
    std::thread w2(waiter, 2);
    std::thread w3(waiter, 3);
    std::thread trigger(trigger_event);
    
    w1.join();
    w2.join();
    w3.join();
    trigger.join();
    
    return 0;
}

读写者模式(写者优先)

struct rw_state {
    int readers;
    int writers;
    int waiting_writers;
};

void reader(int id) {
    ipc::sync::mutex mtx("rw_mutex");
    ipc::sync::condition read_cond("read_cond");
    ipc::shm::handle shm("rw_state", sizeof(rw_state));
    auto* state = static_cast<rw_state*>(shm.get());
    
    mtx.lock();
    
    // 等待没有写者和等待的写者
    while (state->writers > 0 || state->waiting_writers > 0) {
        read_cond.wait(mtx);
    }
    
    state->readers++;
    std::cout << "Reader " << id << " started reading (readers=" 
             << state->readers << ")" << std::endl;
    mtx.unlock();
    
    // 读取数据(模拟)
    std::this_thread::sleep_for(std::chrono::milliseconds(500));
    
    mtx.lock();
    state->readers--;
    std::cout << "Reader " << id << " finished reading" << std::endl;
    
    if (state->readers == 0) {
        // 最后一个读者通知写者
        ipc::sync::condition write_cond("write_cond");
        write_cond.notify(mtx);
    }
    mtx.unlock();
}

void writer(int id) {
    ipc::sync::mutex mtx("rw_mutex");
    ipc::sync::condition write_cond("write_cond");
    ipc::sync::condition read_cond("read_cond");
    ipc::shm::handle shm("rw_state", sizeof(rw_state));
    auto* state = static_cast<rw_state*>(shm.get());
    
    mtx.lock();
    state->waiting_writers++;
    
    // 等待没有读者和写者
    while (state->readers > 0 || state->writers > 0) {
        write_cond.wait(mtx);
    }
    
    state->waiting_writers--;
    state->writers++;
    std::cout << "Writer " << id << " started writing" << std::endl;
    mtx.unlock();
    
    // 写入数据(模拟)
    std::this_thread::sleep_for(std::chrono::milliseconds(300));
    
    mtx.lock();
    state->writers--;
    std::cout << "Writer " << id << " finished writing" << std::endl;
    
    // 优先唤醒等待的写者,否则唤醒所有读者
    if (state->waiting_writers > 0) {
        write_cond.notify(mtx);
    } else {
        read_cond.broadcast(mtx);
    }
    mtx.unlock();
}

条件变量与虚假唤醒

// 正确处理虚假唤醒的模式
void wait_for_condition() {
    ipc::sync::mutex mtx("my_mutex");
    ipc::sync::condition cond("my_cond");
    ipc::shm::handle shm("condition_state", sizeof(bool));
    auto* ready = static_cast<bool*>(shm.get());
    
    mtx.lock();
    
    // 始终在循环中检查条件(防止虚假唤醒)
    while (!*ready) {
        // wait()返回后会重新持有锁
        cond.wait(mtx);
        
        // 醒来后重新检查条件
        // 如果是虚假唤醒,条件不满足,继续等待
    }
    
    // 条件满足,执行操作
    std::cout << "Condition met, proceeding..." << std::endl;
    
    mtx.unlock();
}

使用模式总结

标准使用模式

// 等待方
mtx.lock();
while (!condition_is_true) {  // 使用while而非if
    cond.wait(mtx);
}
// 使用共享资源
mtx.unlock();

// 通知方
mtx.lock();
// 修改共享状态
condition_is_true = true;
cond.notify(mtx);  // 或 cond.broadcast(mtx)
mtx.unlock();

注意事项

  • 必须与mutex配合:condition变量必须与mutex一起使用
  • 锁的状态
    • 调用wait()前必须持有mutex锁
    • wait()会自动释放锁并进入等待
    • 被唤醒后会自动重新获得锁
  • 虚假唤醒:始终在循环中检查条件,不要假设被唤醒就意味着条件满足
  • notify vs broadcast
    • notify():只唤醒一个等待者(适合工作队列)
    • broadcast():唤醒所有等待者(适合状态变化通知)
  • 死锁避免
    • 不要在持有多个锁时调用wait()
    • 确保notify/broadcast在合适的时机调用
  • 超时处理:wait()超时返回false,但mutex仍然被锁定,需要手动解锁
  • 清理资源:程序退出前调用clear()clear_storage()

平台差异

Linux / FreeBSD / QNX

  • 基于POSIX pthread条件变量
  • 支持进程共享(PTHREAD_PROCESS_SHARED)
  • 实现在共享内存中

Windows

  • 基于Windows条件变量API
  • 通过命名对象实现进程间同步

相关文档

  • ipc::sync::mutex - 进程间互斥锁(必须配合使用)
  • ipc::sync::semaphore - 进程间信号量

Clone this wiki locally