summaryrefslogtreecommitdiff
path: root/c_src/linux/workqueue.c
diff options
context:
space:
mode:
Diffstat (limited to 'c_src/linux/workqueue.c')
-rw-r--r--c_src/linux/workqueue.c346
1 files changed, 346 insertions, 0 deletions
diff --git a/c_src/linux/workqueue.c b/c_src/linux/workqueue.c
new file mode 100644
index 00000000..0d5af3fb
--- /dev/null
+++ b/c_src/linux/workqueue.c
@@ -0,0 +1,346 @@
+#include <pthread.h>
+
+#include <linux/kthread.h>
+#include <linux/slab.h>
+#include <linux/workqueue.h>
+
+static pthread_mutex_t wq_lock = PTHREAD_MUTEX_INITIALIZER;
+static pthread_cond_t work_finished = PTHREAD_COND_INITIALIZER;
+static LIST_HEAD(wq_list);
+
+struct workqueue_struct {
+ struct list_head list;
+
+ struct work_struct *current_work;
+ struct list_head pending_work;
+
+ struct task_struct *worker;
+ char name[24];
+};
+
+enum {
+ WORK_PENDING_BIT,
+};
+
+static bool work_pending(struct work_struct *work)
+{
+ return test_bit(WORK_PENDING_BIT, work_data_bits(work));
+}
+
+static void clear_work_pending(struct work_struct *work)
+{
+ clear_bit(WORK_PENDING_BIT, work_data_bits(work));
+}
+
+static bool set_work_pending(struct work_struct *work)
+{
+ return !test_and_set_bit(WORK_PENDING_BIT, work_data_bits(work));
+}
+
+static void __queue_work(struct workqueue_struct *wq,
+ struct work_struct *work)
+{
+ BUG_ON(!work_pending(work));
+ BUG_ON(!list_empty(&work->entry));
+
+ list_add_tail(&work->entry, &wq->pending_work);
+ wake_up_process(wq->worker);
+}
+
+bool queue_work(struct workqueue_struct *wq, struct work_struct *work)
+{
+ bool ret;
+
+ pthread_mutex_lock(&wq_lock);
+ if ((ret = set_work_pending(work)))
+ __queue_work(wq, work);
+ pthread_mutex_unlock(&wq_lock);
+
+ return ret;
+}
+
+void delayed_work_timer_fn(struct timer_list *timer)
+{
+ struct delayed_work *dwork =
+ container_of(timer, struct delayed_work, timer);
+
+ pthread_mutex_lock(&wq_lock);
+ __queue_work(dwork->wq, &dwork->work);
+ pthread_mutex_unlock(&wq_lock);
+}
+
+static void __queue_delayed_work(struct workqueue_struct *wq,
+ struct delayed_work *dwork,
+ unsigned long delay)
+{
+ struct timer_list *timer = &dwork->timer;
+ struct work_struct *work = &dwork->work;
+
+ BUG_ON(timer->function != delayed_work_timer_fn);
+ BUG_ON(timer_pending(timer));
+ BUG_ON(!list_empty(&work->entry));
+
+ if (!delay) {
+ __queue_work(wq, &dwork->work);
+ } else {
+ dwork->wq = wq;
+ timer->expires = jiffies + delay;
+ add_timer(timer);
+ }
+}
+
+bool queue_delayed_work(struct workqueue_struct *wq,
+ struct delayed_work *dwork,
+ unsigned long delay)
+{
+ struct work_struct *work = &dwork->work;
+ bool ret;
+
+ pthread_mutex_lock(&wq_lock);
+ if ((ret = set_work_pending(work)))
+ __queue_delayed_work(wq, dwork, delay);
+ pthread_mutex_unlock(&wq_lock);
+
+ return ret;
+}
+
+static bool grab_pending(struct work_struct *work, bool is_dwork)
+{
+retry:
+ if (set_work_pending(work)) {
+ BUG_ON(!list_empty(&work->entry));
+ return false;
+ }
+
+ if (is_dwork) {
+ struct delayed_work *dwork = to_delayed_work(work);
+
+ if (likely(del_timer(&dwork->timer))) {
+ BUG_ON(!list_empty(&work->entry));
+ return true;
+ }
+ }
+
+ if (!list_empty(&work->entry)) {
+ list_del_init(&work->entry);
+ return true;
+ }
+
+ BUG_ON(!is_dwork);
+
+ pthread_mutex_unlock(&wq_lock);
+ flush_timers();
+ pthread_mutex_lock(&wq_lock);
+ goto retry;
+}
+
+static bool work_running(struct work_struct *work)
+{
+ struct workqueue_struct *wq;
+
+ list_for_each_entry(wq, &wq_list, list)
+ if (wq->current_work == work)
+ return true;
+
+ return false;
+}
+
+bool flush_work(struct work_struct *work)
+{
+ bool ret = false;
+
+ pthread_mutex_lock(&wq_lock);
+ while (work_pending(work) || work_running(work)) {
+ pthread_cond_wait(&work_finished, &wq_lock);
+ ret = true;
+ }
+ pthread_mutex_unlock(&wq_lock);
+
+ return ret;
+}
+
+static bool __flush_work(struct work_struct *work)
+{
+ bool ret = false;
+
+ while (work_running(work)) {
+ pthread_cond_wait(&work_finished, &wq_lock);
+ ret = true;
+ }
+
+ return ret;
+}
+
+bool cancel_work_sync(struct work_struct *work)
+{
+ bool ret;
+
+ pthread_mutex_lock(&wq_lock);
+ ret = grab_pending(work, false);
+
+ __flush_work(work);
+ clear_work_pending(work);
+ pthread_mutex_unlock(&wq_lock);
+
+ return ret;
+}
+
+bool mod_delayed_work(struct workqueue_struct *wq,
+ struct delayed_work *dwork,
+ unsigned long delay)
+{
+ struct work_struct *work = &dwork->work;
+ bool ret;
+
+ pthread_mutex_lock(&wq_lock);
+ ret = grab_pending(work, true);
+
+ __queue_delayed_work(wq, dwork, delay);
+ pthread_mutex_unlock(&wq_lock);
+
+ return ret;
+}
+
+bool cancel_delayed_work(struct delayed_work *dwork)
+{
+ struct work_struct *work = &dwork->work;
+ bool ret;
+
+ pthread_mutex_lock(&wq_lock);
+ ret = grab_pending(work, true);
+
+ clear_work_pending(&dwork->work);
+ pthread_mutex_unlock(&wq_lock);
+
+ return ret;
+}
+
+bool cancel_delayed_work_sync(struct delayed_work *dwork)
+{
+ struct work_struct *work = &dwork->work;
+ bool ret;
+
+ pthread_mutex_lock(&wq_lock);
+ ret = grab_pending(work, true);
+
+ __flush_work(work);
+ clear_work_pending(work);
+ pthread_mutex_unlock(&wq_lock);
+
+ return ret;
+}
+
+static int worker_thread(void *arg)
+{
+ struct workqueue_struct *wq = arg;
+ struct work_struct *work;
+
+ pthread_mutex_lock(&wq_lock);
+ while (1) {
+ __set_current_state(TASK_INTERRUPTIBLE);
+ work = list_first_entry_or_null(&wq->pending_work,
+ struct work_struct, entry);
+ wq->current_work = work;
+
+ if (kthread_should_stop()) {
+ BUG_ON(wq->current_work);
+ break;
+ }
+
+ if (!work) {
+ pthread_mutex_unlock(&wq_lock);
+ schedule();
+ pthread_mutex_lock(&wq_lock);
+ continue;
+ }
+
+ BUG_ON(!work_pending(work));
+ list_del_init(&work->entry);
+ clear_work_pending(work);
+
+ pthread_mutex_unlock(&wq_lock);
+ work->func(work);
+ pthread_mutex_lock(&wq_lock);
+
+ pthread_cond_broadcast(&work_finished);
+ }
+ pthread_mutex_unlock(&wq_lock);
+
+ return 0;
+}
+
+void destroy_workqueue(struct workqueue_struct *wq)
+{
+ kthread_stop(wq->worker);
+
+ pthread_mutex_lock(&wq_lock);
+ list_del(&wq->list);
+ pthread_mutex_unlock(&wq_lock);
+
+ kfree(wq);
+}
+
+struct workqueue_struct *alloc_workqueue(const char *fmt,
+ unsigned flags,
+ int max_active,
+ ...)
+{
+ va_list args;
+ struct workqueue_struct *wq;
+
+ wq = kzalloc(sizeof(*wq), GFP_KERNEL);
+ if (!wq)
+ return NULL;
+
+ INIT_LIST_HEAD(&wq->list);
+ INIT_LIST_HEAD(&wq->pending_work);
+
+ va_start(args, max_active);
+ vsnprintf(wq->name, sizeof(wq->name), fmt, args);
+ va_end(args);
+
+ wq->worker = kthread_run(worker_thread, wq, "%s", wq->name);
+ if (IS_ERR(wq->worker)) {
+ kfree(wq);
+ return NULL;
+ }
+
+ pthread_mutex_lock(&wq_lock);
+ list_add(&wq->list, &wq_list);
+ pthread_mutex_unlock(&wq_lock);
+
+ return wq;
+}
+
+struct workqueue_struct *system_wq;
+struct workqueue_struct *system_highpri_wq;
+struct workqueue_struct *system_long_wq;
+struct workqueue_struct *system_unbound_wq;
+struct workqueue_struct *system_freezable_wq;
+
+__attribute__((constructor(102)))
+static void wq_init(void)
+{
+ system_wq = alloc_workqueue("events", 0, 0);
+ system_highpri_wq = alloc_workqueue("events_highpri", WQ_HIGHPRI, 0);
+ system_long_wq = alloc_workqueue("events_long", 0, 0);
+ system_unbound_wq = alloc_workqueue("events_unbound", WQ_UNBOUND,
+ WQ_UNBOUND_MAX_ACTIVE);
+ system_freezable_wq = alloc_workqueue("events_freezable",
+ WQ_FREEZABLE, 0);
+ BUG_ON(!system_wq || !system_highpri_wq || !system_long_wq ||
+ !system_unbound_wq || !system_freezable_wq);
+}
+
+__attribute__((destructor(102)))
+static void wq_cleanup(void)
+{
+ destroy_workqueue(system_freezable_wq);
+ destroy_workqueue(system_unbound_wq);
+ destroy_workqueue(system_long_wq);
+ destroy_workqueue(system_highpri_wq);
+ destroy_workqueue(system_wq);
+
+ system_wq = system_highpri_wq = system_long_wq = system_unbound_wq =
+ system_freezable_wq = NULL;
+}