319 lines
9.1 KiB
C++
319 lines
9.1 KiB
C++
|
/*
|
||
|
* Copyright 2016 The WebRTC Project Authors. All rights reserved.
|
||
|
*
|
||
|
* Use of this source code is governed by a BSD-style license
|
||
|
* that can be found in the LICENSE file in the root of the source
|
||
|
* tree. An additional intellectual property rights grant can be found
|
||
|
* in the file PATENTS. All contributing project authors may
|
||
|
* be found in the AUTHORS file in the root of the source tree.
|
||
|
*/
|
||
|
|
||
|
#include "webrtc/base/task_queue.h"
|
||
|
|
||
|
#include <fcntl.h>
|
||
|
#include <string.h>
|
||
|
#include <unistd.h>
|
||
|
|
||
|
#include "base/third_party/libevent/event.h"
|
||
|
#include "webrtc/base/checks.h"
|
||
|
#include "webrtc/base/logging.h"
|
||
|
#include "webrtc/base/task_queue_posix.h"
|
||
|
#include "webrtc/base/timeutils.h"
|
||
|
|
||
|
namespace rtc {
|
||
|
using internal::GetQueuePtrTls;
|
||
|
using internal::AutoSetCurrentQueuePtr;
|
||
|
|
||
|
namespace {
|
||
|
static const char kQuit = 1;
|
||
|
static const char kRunTask = 2;
|
||
|
|
||
|
struct TimerEvent {
|
||
|
explicit TimerEvent(std::unique_ptr<QueuedTask> task)
|
||
|
: task(std::move(task)) {}
|
||
|
~TimerEvent() { event_del(&ev); }
|
||
|
event ev;
|
||
|
std::unique_ptr<QueuedTask> task;
|
||
|
};
|
||
|
|
||
|
bool SetNonBlocking(int fd) {
|
||
|
const int flags = fcntl(fd, F_GETFL);
|
||
|
RTC_CHECK(flags != -1);
|
||
|
return (flags & O_NONBLOCK) || fcntl(fd, F_SETFL, flags | O_NONBLOCK) != -1;
|
||
|
}
|
||
|
} // namespace
|
||
|
|
||
|
struct TaskQueue::QueueContext {
|
||
|
explicit QueueContext(TaskQueue* q) : queue(q), is_active(true) {}
|
||
|
TaskQueue* queue;
|
||
|
bool is_active;
|
||
|
// Holds a list of events pending timers for cleanup when the loop exits.
|
||
|
std::list<TimerEvent*> pending_timers_;
|
||
|
};
|
||
|
|
||
|
class TaskQueue::PostAndReplyTask : public QueuedTask {
|
||
|
public:
|
||
|
PostAndReplyTask(std::unique_ptr<QueuedTask> task,
|
||
|
std::unique_ptr<QueuedTask> reply,
|
||
|
TaskQueue* reply_queue)
|
||
|
: task_(std::move(task)),
|
||
|
reply_(std::move(reply)),
|
||
|
reply_queue_(reply_queue) {
|
||
|
reply_queue->PrepareReplyTask(this);
|
||
|
}
|
||
|
|
||
|
~PostAndReplyTask() override {
|
||
|
CritScope lock(&lock_);
|
||
|
if (reply_queue_)
|
||
|
reply_queue_->ReplyTaskDone(this);
|
||
|
}
|
||
|
|
||
|
void OnReplyQueueGone() {
|
||
|
CritScope lock(&lock_);
|
||
|
reply_queue_ = nullptr;
|
||
|
}
|
||
|
|
||
|
private:
|
||
|
bool Run() override {
|
||
|
if (!task_->Run())
|
||
|
task_.release();
|
||
|
|
||
|
CritScope lock(&lock_);
|
||
|
if (reply_queue_)
|
||
|
reply_queue_->PostTask(std::move(reply_));
|
||
|
return true;
|
||
|
}
|
||
|
|
||
|
CriticalSection lock_;
|
||
|
std::unique_ptr<QueuedTask> task_;
|
||
|
std::unique_ptr<QueuedTask> reply_;
|
||
|
TaskQueue* reply_queue_ GUARDED_BY(lock_);
|
||
|
};
|
||
|
|
||
|
class TaskQueue::SetTimerTask : public QueuedTask {
|
||
|
public:
|
||
|
SetTimerTask(std::unique_ptr<QueuedTask> task, uint32_t milliseconds)
|
||
|
: task_(std::move(task)),
|
||
|
milliseconds_(milliseconds),
|
||
|
posted_(Time32()) {}
|
||
|
|
||
|
private:
|
||
|
bool Run() override {
|
||
|
// Compensate for the time that has passed since construction
|
||
|
// and until we got here.
|
||
|
uint32_t post_time = Time32() - posted_;
|
||
|
TaskQueue::Current()->PostDelayedTask(
|
||
|
std::move(task_),
|
||
|
post_time > milliseconds_ ? 0 : milliseconds_ - post_time);
|
||
|
return true;
|
||
|
}
|
||
|
|
||
|
std::unique_ptr<QueuedTask> task_;
|
||
|
const uint32_t milliseconds_;
|
||
|
const uint32_t posted_;
|
||
|
};
|
||
|
|
||
|
TaskQueue::TaskQueue(const char* queue_name)
|
||
|
: event_base_(event_base_new()),
|
||
|
wakeup_event_(new event()),
|
||
|
thread_(&TaskQueue::ThreadMain, this, queue_name) {
|
||
|
RTC_DCHECK(queue_name);
|
||
|
int fds[2];
|
||
|
RTC_CHECK(pipe(fds) == 0);
|
||
|
SetNonBlocking(fds[0]);
|
||
|
SetNonBlocking(fds[1]);
|
||
|
wakeup_pipe_out_ = fds[0];
|
||
|
wakeup_pipe_in_ = fds[1];
|
||
|
event_set(wakeup_event_.get(), wakeup_pipe_out_, EV_READ | EV_PERSIST,
|
||
|
OnWakeup, this);
|
||
|
event_base_set(event_base_, wakeup_event_.get());
|
||
|
event_add(wakeup_event_.get(), 0);
|
||
|
thread_.Start();
|
||
|
}
|
||
|
|
||
|
TaskQueue::~TaskQueue() {
|
||
|
RTC_DCHECK(!IsCurrent());
|
||
|
struct timespec ts;
|
||
|
char message = kQuit;
|
||
|
while (write(wakeup_pipe_in_, &message, sizeof(message)) != sizeof(message)) {
|
||
|
// The queue is full, so we have no choice but to wait and retry.
|
||
|
RTC_CHECK_EQ(EAGAIN, errno);
|
||
|
ts.tv_sec = 0;
|
||
|
ts.tv_nsec = 1000000;
|
||
|
nanosleep(&ts, nullptr);
|
||
|
}
|
||
|
|
||
|
thread_.Stop();
|
||
|
|
||
|
event_del(wakeup_event_.get());
|
||
|
close(wakeup_pipe_in_);
|
||
|
close(wakeup_pipe_out_);
|
||
|
wakeup_pipe_in_ = -1;
|
||
|
wakeup_pipe_out_ = -1;
|
||
|
|
||
|
{
|
||
|
// Synchronize against any pending reply tasks that might be running on
|
||
|
// other queues.
|
||
|
CritScope lock(&pending_lock_);
|
||
|
for (auto* reply : pending_replies_)
|
||
|
reply->OnReplyQueueGone();
|
||
|
pending_replies_.clear();
|
||
|
}
|
||
|
|
||
|
event_base_free(event_base_);
|
||
|
}
|
||
|
|
||
|
// static
|
||
|
TaskQueue* TaskQueue::Current() {
|
||
|
QueueContext* ctx =
|
||
|
static_cast<QueueContext*>(pthread_getspecific(GetQueuePtrTls()));
|
||
|
return ctx ? ctx->queue : nullptr;
|
||
|
}
|
||
|
|
||
|
// static
|
||
|
bool TaskQueue::IsCurrent(const char* queue_name) {
|
||
|
TaskQueue* current = Current();
|
||
|
return current && current->thread_.name().compare(queue_name) == 0;
|
||
|
}
|
||
|
|
||
|
bool TaskQueue::IsCurrent() const {
|
||
|
return IsThreadRefEqual(thread_.GetThreadRef(), CurrentThreadRef());
|
||
|
}
|
||
|
|
||
|
void TaskQueue::PostTask(std::unique_ptr<QueuedTask> task) {
|
||
|
RTC_DCHECK(task.get());
|
||
|
// libevent isn't thread safe. This means that we can't use methods such
|
||
|
// as event_base_once to post tasks to the worker thread from a different
|
||
|
// thread. However, we can use it when posting from the worker thread itself.
|
||
|
if (IsCurrent()) {
|
||
|
if (event_base_once(event_base_, -1, EV_TIMEOUT, &TaskQueue::RunTask,
|
||
|
task.get(), nullptr) == 0) {
|
||
|
task.release();
|
||
|
}
|
||
|
} else {
|
||
|
QueuedTask* task_id = task.get(); // Only used for comparison.
|
||
|
{
|
||
|
CritScope lock(&pending_lock_);
|
||
|
pending_.push_back(std::move(task));
|
||
|
}
|
||
|
char message = kRunTask;
|
||
|
if (write(wakeup_pipe_in_, &message, sizeof(message)) != sizeof(message)) {
|
||
|
LOG(WARNING) << "Failed to queue task.";
|
||
|
CritScope lock(&pending_lock_);
|
||
|
pending_.remove_if([task_id](std::unique_ptr<QueuedTask>& t) {
|
||
|
return t.get() == task_id;
|
||
|
});
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
void TaskQueue::PostDelayedTask(std::unique_ptr<QueuedTask> task,
|
||
|
uint32_t milliseconds) {
|
||
|
if (IsCurrent()) {
|
||
|
TimerEvent* timer = new TimerEvent(std::move(task));
|
||
|
evtimer_set(&timer->ev, &TaskQueue::RunTimer, timer);
|
||
|
event_base_set(event_base_, &timer->ev);
|
||
|
QueueContext* ctx =
|
||
|
static_cast<QueueContext*>(pthread_getspecific(GetQueuePtrTls()));
|
||
|
ctx->pending_timers_.push_back(timer);
|
||
|
timeval tv = {milliseconds / 1000, (milliseconds % 1000) * 1000};
|
||
|
event_add(&timer->ev, &tv);
|
||
|
} else {
|
||
|
PostTask(std::unique_ptr<QueuedTask>(
|
||
|
new SetTimerTask(std::move(task), milliseconds)));
|
||
|
}
|
||
|
}
|
||
|
|
||
|
void TaskQueue::PostTaskAndReply(std::unique_ptr<QueuedTask> task,
|
||
|
std::unique_ptr<QueuedTask> reply,
|
||
|
TaskQueue* reply_queue) {
|
||
|
std::unique_ptr<QueuedTask> wrapper_task(
|
||
|
new PostAndReplyTask(std::move(task), std::move(reply), reply_queue));
|
||
|
PostTask(std::move(wrapper_task));
|
||
|
}
|
||
|
|
||
|
void TaskQueue::PostTaskAndReply(std::unique_ptr<QueuedTask> task,
|
||
|
std::unique_ptr<QueuedTask> reply) {
|
||
|
return PostTaskAndReply(std::move(task), std::move(reply), Current());
|
||
|
}
|
||
|
|
||
|
// static
|
||
|
bool TaskQueue::ThreadMain(void* context) {
|
||
|
TaskQueue* me = static_cast<TaskQueue*>(context);
|
||
|
|
||
|
QueueContext queue_context(me);
|
||
|
pthread_setspecific(GetQueuePtrTls(), &queue_context);
|
||
|
|
||
|
while (queue_context.is_active)
|
||
|
event_base_loop(me->event_base_, 0);
|
||
|
|
||
|
pthread_setspecific(GetQueuePtrTls(), nullptr);
|
||
|
|
||
|
for (TimerEvent* timer : queue_context.pending_timers_)
|
||
|
delete timer;
|
||
|
|
||
|
return false;
|
||
|
}
|
||
|
|
||
|
// static
|
||
|
void TaskQueue::OnWakeup(int socket, short flags, void* context) { // NOLINT
|
||
|
QueueContext* ctx =
|
||
|
static_cast<QueueContext*>(pthread_getspecific(GetQueuePtrTls()));
|
||
|
RTC_DCHECK(ctx->queue->wakeup_pipe_out_ == socket);
|
||
|
char buf;
|
||
|
RTC_CHECK(sizeof(buf) == read(socket, &buf, sizeof(buf)));
|
||
|
switch (buf) {
|
||
|
case kQuit:
|
||
|
ctx->is_active = false;
|
||
|
event_base_loopbreak(ctx->queue->event_base_);
|
||
|
break;
|
||
|
case kRunTask: {
|
||
|
std::unique_ptr<QueuedTask> task;
|
||
|
{
|
||
|
CritScope lock(&ctx->queue->pending_lock_);
|
||
|
RTC_DCHECK(!ctx->queue->pending_.empty());
|
||
|
task = std::move(ctx->queue->pending_.front());
|
||
|
ctx->queue->pending_.pop_front();
|
||
|
RTC_DCHECK(task.get());
|
||
|
}
|
||
|
if (!task->Run())
|
||
|
task.release();
|
||
|
break;
|
||
|
}
|
||
|
default:
|
||
|
RTC_NOTREACHED();
|
||
|
break;
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// static
|
||
|
void TaskQueue::RunTask(int fd, short flags, void* context) { // NOLINT
|
||
|
auto* task = static_cast<QueuedTask*>(context);
|
||
|
if (task->Run())
|
||
|
delete task;
|
||
|
}
|
||
|
|
||
|
// static
|
||
|
void TaskQueue::RunTimer(int fd, short flags, void* context) { // NOLINT
|
||
|
TimerEvent* timer = static_cast<TimerEvent*>(context);
|
||
|
if (!timer->task->Run())
|
||
|
timer->task.release();
|
||
|
QueueContext* ctx =
|
||
|
static_cast<QueueContext*>(pthread_getspecific(GetQueuePtrTls()));
|
||
|
ctx->pending_timers_.remove(timer);
|
||
|
delete timer;
|
||
|
}
|
||
|
|
||
|
void TaskQueue::PrepareReplyTask(PostAndReplyTask* reply_task) {
|
||
|
RTC_DCHECK(reply_task);
|
||
|
CritScope lock(&pending_lock_);
|
||
|
pending_replies_.push_back(reply_task);
|
||
|
}
|
||
|
|
||
|
void TaskQueue::ReplyTaskDone(PostAndReplyTask* reply_task) {
|
||
|
CritScope lock(&pending_lock_);
|
||
|
pending_replies_.remove(reply_task);
|
||
|
}
|
||
|
|
||
|
} // namespace rtc
|