Skip to content

ipc::sync::semaphore

mutouyun edited this page Dec 9, 2025 · 1 revision

进程间信号量,用于控制对共享资源的访问数量。支持命名和匿名两种模式。

namespace ipc {
namespace sync {

class semaphore {
public:
    semaphore();
    explicit semaphore(char const *name, std::uint32_t count = 0);
    ~semaphore();

    void const *native() const noexcept;
    void *native() noexcept;

    bool valid() const noexcept;

    bool open(char const *name, std::uint32_t count = 0) noexcept;
    void close() noexcept;

    void clear() noexcept;
    static void clear_storage(char const * name) noexcept;

    bool wait(std::uint64_t tm = ipc::invalid_value) noexcept;
    bool post(std::uint32_t count = 1) noexcept;
};

} // namespace sync
} // namespace ipc
成员 说明
semaphore 构造函数
~semaphore 析构函数
open 打开一个命名的semaphore
close 关闭semaphore
clear 清理semaphore资源
clear_storage 清理指定名称的semaphore存储
wait 等待信号量(P操作,减1)
post 释放信号量(V操作,加N)
valid 检查semaphore是否有效
native 获取原生句柄

构造函数

semaphore

/*A*/ semaphore();
/*B*/ explicit semaphore(char const *name, std::uint32_t count = 0);
  • A. 默认构造函数,创建一个匿名semaphore(仅用于进程内同步)
  • B. 创建或打开一个命名的semaphore(用于进程间同步)
参数 说明
name char const *,semaphore的名称。使用相同名称可以在不同进程间共享semaphore
count std::uint32_t,初始计数值。默认为0

~semaphore

~semaphore();

析构函数。自动释放semaphore资源。

资源管理

open

bool open(char const *name, std::uint32_t count = 0) noexcept;

打开一个命名的semaphore。如果semaphore不存在,会自动创建并设置初始计数值。

参数 说明
name char const *,semaphore的名称
count std::uint32_t,初始计数值(仅在创建时有效)
返回值 说明
true 打开成功
false 打开失败

close

void close() noexcept;

关闭semaphore。释放相关资源。

clear

void clear() noexcept;

清理已打开semaphore的所有资源,包括共享内存。

clear_storage

static void clear_storage(char const * name) noexcept;

静态方法。清理指定名称的semaphore存储资源。

参数 说明
name char const *,要清理的semaphore名称

信号量操作

wait

bool wait(std::uint64_t tm = ipc::invalid_value) noexcept;

等待信号量(P操作,减1)。如果当前计数为0,则阻塞等待直到计数大于0或超时。

参数 说明
tm std::uint64_t,超时时间(毫秒)。默认为ipc::invalid_value(无限等待)
返回值 说明
true 成功获得信号量(计数减1)
false 超时未获得信号量

post

bool post(std::uint32_t count = 1) noexcept;

释放信号量(V操作,加N)。增加信号量的计数,唤醒等待的线程/进程。

参数 说明
count std::uint32_t,要增加的计数值。默认为1
返回值 说明
true 操作成功
false 操作失败

状态查询

valid

bool valid() const noexcept;

检查semaphore是否有效(已打开)。

返回值 说明
true semaphore有效
false semaphore无效或未打开

native

/*A*/ void const *native() const noexcept;
/*B*/ void *native() noexcept;

获取平台相关的原生semaphore句柄。

使用示例

生产者-消费者模式

#include "libipc/semaphore.h"
#include "libipc/shm.h"
#include <iostream>
#include <thread>

constexpr int BUFFER_SIZE = 10;

struct shared_buffer {
    int data[BUFFER_SIZE];
    int in;   // 生产者写入位置
    int out;  // 消费者读取位置
};

// 生产者
void producer(int id) {
    ipc::sync::semaphore empty("empty_slots", BUFFER_SIZE);  // 空槽位数
    ipc::sync::semaphore full("full_slots", 0);              // 满槽位数
    ipc::sync::mutex mtx("buffer_mutex");
    
    ipc::shm::handle shm("shared_buffer", sizeof(shared_buffer));
    auto* buffer = static_cast<shared_buffer*>(shm.get());
    
    for (int i = 0; i < 20; ++i) {
        int item = id * 1000 + i;
        
        // 等待空槽位
        if (!empty.wait(5000)) {
            std::cout << "Producer " << id << ": timeout waiting for empty slot" << std::endl;
            continue;
        }
        
        // 加锁访问共享缓冲区
        mtx.lock();
        buffer->data[buffer->in] = item;
        buffer->in = (buffer->in + 1) % BUFFER_SIZE;
        std::cout << "Producer " << id << " produced: " << item << std::endl;
        mtx.unlock();
        
        // 增加满槽位计数
        full.post();
        
        std::this_thread::sleep_for(std::chrono::milliseconds(100));
    }
}

// 消费者
void consumer(int id) {
    ipc::sync::semaphore empty("empty_slots", BUFFER_SIZE);
    ipc::sync::semaphore full("full_slots", 0);
    ipc::sync::mutex mtx("buffer_mutex");
    
    ipc::shm::handle shm("shared_buffer", sizeof(shared_buffer));
    auto* buffer = static_cast<shared_buffer*>(shm.get());
    
    for (int i = 0; i < 20; ++i) {
        // 等待满槽位
        if (!full.wait(5000)) {
            std::cout << "Consumer " << id << ": timeout waiting for full slot" << std::endl;
            continue;
        }
        
        // 加锁访问共享缓冲区
        mtx.lock();
        int item = buffer->data[buffer->out];
        buffer->out = (buffer->out + 1) % BUFFER_SIZE;
        std::cout << "Consumer " << id << " consumed: " << item << std::endl;
        mtx.unlock();
        
        // 增加空槽位计数
        empty.post();
        
        std::this_thread::sleep_for(std::chrono::milliseconds(150));
    }
}

int main() {
    // 初始化共享缓冲区
    {
        ipc::shm::handle shm("shared_buffer", sizeof(shared_buffer));
        auto* buffer = static_cast<shared_buffer*>(shm.get());
        buffer->in = 0;
        buffer->out = 0;
    }
    
    std::thread p1(producer, 1);
    std::thread p2(producer, 2);
    std::thread c1(consumer, 1);
    std::thread c2(consumer, 2);
    
    p1.join();
    p2.join();
    c1.join();
    c2.join();
    
    // 清理
    ipc::sync::semaphore::clear_storage("empty_slots");
    ipc::sync::semaphore::clear_storage("full_slots");
    ipc::sync::mutex::clear_storage("buffer_mutex");
    
    return 0;
}

资源池管理

// 限制同时访问资源的进程/线程数量
class resource_pool {
    ipc::sync::semaphore sem_;
    
public:
    explicit resource_pool(const char* name, int max_users)
        : sem_(name, max_users) {}
    
    bool acquire(std::uint64_t timeout_ms = ipc::invalid_value) {
        return sem_.wait(timeout_ms);
    }
    
    void release() {
        sem_.post();
    }
};

// 使用示例
void worker(int id, resource_pool& pool) {
    if (pool.acquire(2000)) {  // 最多等待2秒
        std::cout << "Worker " << id << " acquired resource" << std::endl;
        
        // 使用资源
        std::this_thread::sleep_for(std::chrono::milliseconds(500));
        
        pool.release();
        std::cout << "Worker " << id << " released resource" << std::endl;
    } else {
        std::cout << "Worker " << id << " timeout" << std::endl;
    }
}

int main() {
    resource_pool pool("db_connections", 3);  // 最多3个并发连接
    
    std::vector<std::thread> workers;
    for (int i = 0; i < 10; ++i) {
        workers.emplace_back(worker, i, std::ref(pool));
    }
    
    for (auto& t : workers) {
        t.join();
    }
    
    return 0;
}

事件通知

// 使用信号量作为事件通知机制
void event_waiter() {
    ipc::sync::semaphore event("event_signal", 0);
    
    std::cout << "Waiting for event..." << std::endl;
    if (event.wait(5000)) {
        std::cout << "Event received!" << std::endl;
    } else {
        std::cout << "Event timeout!" << std::endl;
    }
}

void event_signaler() {
    std::this_thread::sleep_for(std::chrono::seconds(2));
    
    ipc::sync::semaphore event("event_signal", 0);
    std::cout << "Signaling event..." << std::endl;
    event.post();  // 触发事件
}

int main() {
    std::thread waiter(event_waiter);
    std::thread signaler(event_signaler);
    
    waiter.join();
    signaler.join();
    
    ipc::sync::semaphore::clear_storage("event_signal");
    
    return 0;
}

多次释放信号量

// 一次性唤醒多个等待者
void batch_wake_up() {
    ipc::sync::semaphore sem("batch_sem", 0);
    
    // 启动5个等待者
    std::vector<std::thread> waiters;
    for (int i = 0; i < 5; ++i) {
        waiters.emplace_back([i, &sem]() {
            std::cout << "Waiter " << i << " waiting..." << std::endl;
            sem.wait();
            std::cout << "Waiter " << i << " woke up!" << std::endl;
        });
    }
    
    std::this_thread::sleep_for(std::chrono::seconds(1));
    
    // 一次性唤醒所有等待者
    std::cout << "Waking up all waiters..." << std::endl;
    sem.post(5);  // 增加5个计数
    
    for (auto& t : waiters) {
        t.join();
    }
    
    ipc::sync::semaphore::clear_storage("batch_sem");
}

常见使用模式

互斥锁模拟

// 使用信号量模拟互斥锁(初始值为1)
ipc::sync::semaphore mutex_sem("mutex_sim", 1);

// 加锁
mutex_sem.wait();
// 临界区
// ...
// 解锁
mutex_sem.post();

读写者模式

// 控制读者数量
ipc::sync::semaphore read_sem("readers", 5);  // 最多5个并发读者

// 读者
read_sem.wait();
// 读取数据
// ...
read_sem.post();

屏障同步

// 等待所有线程/进程到达屏障点
void barrier_sync(int thread_id, int total_threads) {
    static ipc::sync::semaphore barrier("barrier_sem", 0);
    static ipc::sync::mutex counter_mtx("counter_mtx");
    static ipc::shm::handle shm("barrier_counter", sizeof(int));
    
    auto* count = static_cast<int*>(shm.get());
    
    // 增加到达计数
    counter_mtx.lock();
    (*count)++;
    bool is_last = (*count == total_threads);
    counter_mtx.unlock();
    
    if (is_last) {
        // 最后一个到达的线程唤醒所有等待者
        std::cout << "Thread " << thread_id << " is last, releasing all" << std::endl;
        barrier.post(total_threads - 1);
    } else {
        // 其他线程等待
        std::cout << "Thread " << thread_id << " waiting at barrier" << std::endl;
        barrier.wait();
    }
    
    std::cout << "Thread " << thread_id << " passed barrier" << std::endl;
}

平台差异

Linux / FreeBSD / QNX

  • 基于POSIX命名信号量
  • 信号量名称需要以'/'开头
  • 使用sem_open(), sem_wait(), sem_post()等API

Windows

  • 基于Windows Semaphore对象
  • 通过命名Semaphore实现进程间同步
  • 最大计数值受系统限制

注意事项

  • 初始计数值
    • 用作互斥锁时,设置为1
    • 用作资源池时,设置为资源数量
    • 用作事件通知时,设置为0
  • 死锁避免:确保每个wait()都有对应的post()
  • 计数溢出:注意不要让计数值超过平台限制
  • 清理资源:程序退出前调用clear()clear_storage()
  • 超时使用:生产环境建议使用明确的超时值而非无限等待
  • 命名规范:不同的semaphore使用不同的名称以避免冲突
  • 原子性wait()post()操作都是原子的

相关文档

  • ipc::sync::mutex - 进程间互斥锁
  • ipc::sync::condition - 进程间条件变量

Clone this wiki locally