diff options
Diffstat (limited to 'c_src/linux/workqueue.c')
-rw-r--r-- | c_src/linux/workqueue.c | 346 |
1 files changed, 346 insertions, 0 deletions
diff --git a/c_src/linux/workqueue.c b/c_src/linux/workqueue.c new file mode 100644 index 00000000..0d5af3fb --- /dev/null +++ b/c_src/linux/workqueue.c @@ -0,0 +1,346 @@ +#include <pthread.h> + +#include <linux/kthread.h> +#include <linux/slab.h> +#include <linux/workqueue.h> + +static pthread_mutex_t wq_lock = PTHREAD_MUTEX_INITIALIZER; +static pthread_cond_t work_finished = PTHREAD_COND_INITIALIZER; +static LIST_HEAD(wq_list); + +struct workqueue_struct { + struct list_head list; + + struct work_struct *current_work; + struct list_head pending_work; + + struct task_struct *worker; + char name[24]; +}; + +enum { + WORK_PENDING_BIT, +}; + +static bool work_pending(struct work_struct *work) +{ + return test_bit(WORK_PENDING_BIT, work_data_bits(work)); +} + +static void clear_work_pending(struct work_struct *work) +{ + clear_bit(WORK_PENDING_BIT, work_data_bits(work)); +} + +static bool set_work_pending(struct work_struct *work) +{ + return !test_and_set_bit(WORK_PENDING_BIT, work_data_bits(work)); +} + +static void __queue_work(struct workqueue_struct *wq, + struct work_struct *work) +{ + BUG_ON(!work_pending(work)); + BUG_ON(!list_empty(&work->entry)); + + list_add_tail(&work->entry, &wq->pending_work); + wake_up_process(wq->worker); +} + +bool queue_work(struct workqueue_struct *wq, struct work_struct *work) +{ + bool ret; + + pthread_mutex_lock(&wq_lock); + if ((ret = set_work_pending(work))) + __queue_work(wq, work); + pthread_mutex_unlock(&wq_lock); + + return ret; +} + +void delayed_work_timer_fn(struct timer_list *timer) +{ + struct delayed_work *dwork = + container_of(timer, struct delayed_work, timer); + + pthread_mutex_lock(&wq_lock); + __queue_work(dwork->wq, &dwork->work); + pthread_mutex_unlock(&wq_lock); +} + +static void __queue_delayed_work(struct workqueue_struct *wq, + struct delayed_work *dwork, + unsigned long delay) +{ + struct timer_list *timer = &dwork->timer; + struct work_struct *work = &dwork->work; + + BUG_ON(timer->function != delayed_work_timer_fn); + BUG_ON(timer_pending(timer)); + BUG_ON(!list_empty(&work->entry)); + + if (!delay) { + __queue_work(wq, &dwork->work); + } else { + dwork->wq = wq; + timer->expires = jiffies + delay; + add_timer(timer); + } +} + +bool queue_delayed_work(struct workqueue_struct *wq, + struct delayed_work *dwork, + unsigned long delay) +{ + struct work_struct *work = &dwork->work; + bool ret; + + pthread_mutex_lock(&wq_lock); + if ((ret = set_work_pending(work))) + __queue_delayed_work(wq, dwork, delay); + pthread_mutex_unlock(&wq_lock); + + return ret; +} + +static bool grab_pending(struct work_struct *work, bool is_dwork) +{ +retry: + if (set_work_pending(work)) { + BUG_ON(!list_empty(&work->entry)); + return false; + } + + if (is_dwork) { + struct delayed_work *dwork = to_delayed_work(work); + + if (likely(del_timer(&dwork->timer))) { + BUG_ON(!list_empty(&work->entry)); + return true; + } + } + + if (!list_empty(&work->entry)) { + list_del_init(&work->entry); + return true; + } + + BUG_ON(!is_dwork); + + pthread_mutex_unlock(&wq_lock); + flush_timers(); + pthread_mutex_lock(&wq_lock); + goto retry; +} + +static bool work_running(struct work_struct *work) +{ + struct workqueue_struct *wq; + + list_for_each_entry(wq, &wq_list, list) + if (wq->current_work == work) + return true; + + return false; +} + +bool flush_work(struct work_struct *work) +{ + bool ret = false; + + pthread_mutex_lock(&wq_lock); + while (work_pending(work) || work_running(work)) { + pthread_cond_wait(&work_finished, &wq_lock); + ret = true; + } + pthread_mutex_unlock(&wq_lock); + + return ret; +} + +static bool __flush_work(struct work_struct *work) +{ + bool ret = false; + + while (work_running(work)) { + pthread_cond_wait(&work_finished, &wq_lock); + ret = true; + } + + return ret; +} + +bool cancel_work_sync(struct work_struct *work) +{ + bool ret; + + pthread_mutex_lock(&wq_lock); + ret = grab_pending(work, false); + + __flush_work(work); + clear_work_pending(work); + pthread_mutex_unlock(&wq_lock); + + return ret; +} + +bool mod_delayed_work(struct workqueue_struct *wq, + struct delayed_work *dwork, + unsigned long delay) +{ + struct work_struct *work = &dwork->work; + bool ret; + + pthread_mutex_lock(&wq_lock); + ret = grab_pending(work, true); + + __queue_delayed_work(wq, dwork, delay); + pthread_mutex_unlock(&wq_lock); + + return ret; +} + +bool cancel_delayed_work(struct delayed_work *dwork) +{ + struct work_struct *work = &dwork->work; + bool ret; + + pthread_mutex_lock(&wq_lock); + ret = grab_pending(work, true); + + clear_work_pending(&dwork->work); + pthread_mutex_unlock(&wq_lock); + + return ret; +} + +bool cancel_delayed_work_sync(struct delayed_work *dwork) +{ + struct work_struct *work = &dwork->work; + bool ret; + + pthread_mutex_lock(&wq_lock); + ret = grab_pending(work, true); + + __flush_work(work); + clear_work_pending(work); + pthread_mutex_unlock(&wq_lock); + + return ret; +} + +static int worker_thread(void *arg) +{ + struct workqueue_struct *wq = arg; + struct work_struct *work; + + pthread_mutex_lock(&wq_lock); + while (1) { + __set_current_state(TASK_INTERRUPTIBLE); + work = list_first_entry_or_null(&wq->pending_work, + struct work_struct, entry); + wq->current_work = work; + + if (kthread_should_stop()) { + BUG_ON(wq->current_work); + break; + } + + if (!work) { + pthread_mutex_unlock(&wq_lock); + schedule(); + pthread_mutex_lock(&wq_lock); + continue; + } + + BUG_ON(!work_pending(work)); + list_del_init(&work->entry); + clear_work_pending(work); + + pthread_mutex_unlock(&wq_lock); + work->func(work); + pthread_mutex_lock(&wq_lock); + + pthread_cond_broadcast(&work_finished); + } + pthread_mutex_unlock(&wq_lock); + + return 0; +} + +void destroy_workqueue(struct workqueue_struct *wq) +{ + kthread_stop(wq->worker); + + pthread_mutex_lock(&wq_lock); + list_del(&wq->list); + pthread_mutex_unlock(&wq_lock); + + kfree(wq); +} + +struct workqueue_struct *alloc_workqueue(const char *fmt, + unsigned flags, + int max_active, + ...) +{ + va_list args; + struct workqueue_struct *wq; + + wq = kzalloc(sizeof(*wq), GFP_KERNEL); + if (!wq) + return NULL; + + INIT_LIST_HEAD(&wq->list); + INIT_LIST_HEAD(&wq->pending_work); + + va_start(args, max_active); + vsnprintf(wq->name, sizeof(wq->name), fmt, args); + va_end(args); + + wq->worker = kthread_run(worker_thread, wq, "%s", wq->name); + if (IS_ERR(wq->worker)) { + kfree(wq); + return NULL; + } + + pthread_mutex_lock(&wq_lock); + list_add(&wq->list, &wq_list); + pthread_mutex_unlock(&wq_lock); + + return wq; +} + +struct workqueue_struct *system_wq; +struct workqueue_struct *system_highpri_wq; +struct workqueue_struct *system_long_wq; +struct workqueue_struct *system_unbound_wq; +struct workqueue_struct *system_freezable_wq; + +__attribute__((constructor(102))) +static void wq_init(void) +{ + system_wq = alloc_workqueue("events", 0, 0); + system_highpri_wq = alloc_workqueue("events_highpri", WQ_HIGHPRI, 0); + system_long_wq = alloc_workqueue("events_long", 0, 0); + system_unbound_wq = alloc_workqueue("events_unbound", WQ_UNBOUND, + WQ_UNBOUND_MAX_ACTIVE); + system_freezable_wq = alloc_workqueue("events_freezable", + WQ_FREEZABLE, 0); + BUG_ON(!system_wq || !system_highpri_wq || !system_long_wq || + !system_unbound_wq || !system_freezable_wq); +} + +__attribute__((destructor(102))) +static void wq_cleanup(void) +{ + destroy_workqueue(system_freezable_wq); + destroy_workqueue(system_unbound_wq); + destroy_workqueue(system_long_wq); + destroy_workqueue(system_highpri_wq); + destroy_workqueue(system_wq); + + system_wq = system_highpri_wq = system_long_wq = system_unbound_wq = + system_freezable_wq = NULL; +} |