developer tip

C ++ 11 스레드로부터 안전한 큐

optionbox 2020. 12. 1. 07:54
반응형

C ++ 11 스레드로부터 안전한 큐


내가 작업중인 프로젝트는 여러 스레드를 사용하여 파일 모음에 대한 작업을 수행합니다. 각 스레드는 처리 할 파일 목록에 파일을 추가 할 수 있으므로 스레드로부터 안전한 큐를 만들었습니다. 관련 부분은 다음과 같습니다.

// qMutex is a std::mutex intended to guard the queue
// populatedNotifier is a std::condition_variable intended to
//                   notify waiting threads of a new item in the queue

void FileQueue::enqueue(std::string&& filename)
{
    std::lock_guard<std::mutex> lock(qMutex);
    q.push(std::move(filename));

    // Notify anyone waiting for additional files that more have arrived
    populatedNotifier.notify_one();
}

std::string FileQueue::dequeue(const std::chrono::milliseconds& timeout)
{
    std::unique_lock<std::mutex> lock(qMutex);
    if (q.empty()) {
        if (populatedNotifier.wait_for(lock, timeout) == std::cv_status::no_timeout) {
            std::string ret = q.front();
            q.pop();
            return ret;
        }
        else {
            return std::string();
        }
    }
    else {
        std::string ret = q.front();
        q.pop();
        return ret;
    }
}

그러나 나는 때때로 if (...wait_for(lock, timeout) == std::cv_status::no_timeout) { }블록 내부에서 segfault를 수행하고 있으며 gdb에서 검사하면 큐가 비어 있기 때문에 segfault가 발생하고 있음을 나타냅니다. 이것이 어떻게 가능한지? 알림을 받았을 wait_for때만 반환 된다는 것이 내 이해였으며 이것은 새 항목을 대기열에 방금 밀어 넣은 cv_status::no_timeout후에 만 발생해야합니다 FileQueue::enqueue.


표준에 따르면 condition_variables이벤트가 발생하지 않은 경우에도 허위로 깨울 수 있습니다. 스퓨리어스 웨이크 업의 경우 cv_status::no_timeout알림을받지 않았더라도 (시간 초과 대신 깨어 났기 때문에) 반환됩니다 . 이에 대한 올바른 해결책은 진행하기 전에 웨이크 업이 실제로 합법적인지 확인하는 것입니다.

세부 사항은 표준 §30.5.1 [thread.condition.condvar]에 지정되어 있습니다.

-이 함수는 notify_one () 호출, notify_all () 호출, abs_time으로 지정된 절대 시간 제한 (30.2.4) 만료 또는 가짜로 신호를 받으면 차단 해제됩니다.

...

반환 값 : abs_time에 지정된 절대 시간 초과 (30.2.4)가 만료 된 경우 cv_status :: timeout, other-ise cv_status :: no_timeout.


보기 만해도 조건 변수를 확인할 때 while 루프를 사용하는 것이 가장 좋습니다 (깨어나고 여전히 유효하지 않은 경우 다시 확인). 방금 비동기 대기열에 대한 템플릿을 작성했는데 이것이 도움이되기를 바랍니다.

#ifndef SAFE_QUEUE
#define SAFE_QUEUE

#include <queue>
#include <mutex>
#include <condition_variable>

// A threadsafe-queue.
template <class T>
class SafeQueue
{
public:
  SafeQueue(void)
    : q()
    , m()
    , c()
  {}

  ~SafeQueue(void)
  {}

  // Add an element to the queue.
  void enqueue(T t)
  {
    std::lock_guard<std::mutex> lock(m);
    q.push(t);
    c.notify_one();
  }

  // Get the "front"-element.
  // If the queue is empty, wait till a element is avaiable.
  T dequeue(void)
  {
    std::unique_lock<std::mutex> lock(m);
    while(q.empty())
    {
      // release lock as long as the wait and reaquire it afterwards.
      c.wait(lock);
    }
    T val = q.front();
    q.pop();
    return val;
  }

private:
  std::queue<T> q;
  mutable std::mutex m;
  std::condition_variable c;
};
#endif

이것은 아마도 당신이 그것을 해야하는 방법입니다.

void push(std::string&& filename)
{
    {
        std::lock_guard<std::mutex> lock(qMutex);

        q.push(std::move(filename));
    }

    populatedNotifier.notify_one();
}

bool try_pop(std::string& filename, std::chrono::milliseconds timeout)
{
    std::unique_lock<std::mutex> lock(qMutex);

    if(!populatedNotifier.wait_for(lock, timeout, [this] { return !q.empty(); }))
        return false;

    filename = std::move(q.front());
    q.pop();

    return true;    
}

허용되는 답변에 추가하면 올바른 다중 생산자 / 다중 소비자 대기열을 구현하는 것이 어렵다고 말할 수 있습니다 (C ++ 11 이후로 더 쉬움)

나는 당신이 (매우 좋은) lock free boost 라이브러리 를 시도해 볼 것을 제안 할 것이다 . "queue"구조는 당신이 원하는 것을 할 것이다. wait-free / lock-free 보장과 함께 C ++ 11 컴파일러가 필요 없다 .

잠금없는 라이브러리가 상당히 새로운 것이기 때문에 지금이 답변을 추가하고 있습니다 (내가 믿는 1.53 이후)


대기열에서 빼기 기능을 다음과 같이 다시 작성합니다.

std::string FileQueue::dequeue(const std::chrono::milliseconds& timeout)
{
    std::unique_lock<std::mutex> lock(qMutex);
    while(q.empty()) {
        if (populatedNotifier.wait_for(lock, timeout) == std::cv_status::timeout ) 
           return std::string();
    }
    std::string ret = q.front();
    q.pop();
    return ret;
}

It is shorter and does not have duplicate code like your did. Only issue it may wait longer that timeout. To prevent that you would need to remember start time before loop, check for timeout and adjust wait time accordingly. Or specify absolute time on wait condition.


There is also GLib solution for this case, I did not try it yet, but I believe it is a good solution. https://developer.gnome.org/glib/2.36/glib-Asynchronous-Queues.html#g-async-queue-new


BlockingCollection is a C++11 thread safe collection class that provides support for queue, stack and priority containers. It handles the "empty" queue scenario you described. As well as a "full" queue.


You may like lfqueue, https://github.com/Taymindis/lfqueue. It’s lock free concurrent queue. I’m currently using it to consuming the queue from multiple incoming calls and works like a charm.

참고URL : https://stackoverflow.com/questions/15278343/c11-thread-safe-queue

반응형