diff options
Diffstat (limited to 'linux/closure.c')
-rw-r--r-- | linux/closure.c | 195 |
1 files changed, 97 insertions, 98 deletions
diff --git a/linux/closure.c b/linux/closure.c index 4fb78d18..f1b4a797 100644 --- a/linux/closure.c +++ b/linux/closure.c @@ -13,65 +13,83 @@ #include <linux/seq_file.h> #include <linux/sched/debug.h> -static inline void closure_put_after_sub_checks(struct closure *cl, int flags) +static void closure_val_checks(struct closure *cl, unsigned new, int d) { - int r = flags & CLOSURE_REMAINING_MASK; + unsigned count = new & CLOSURE_REMAINING_MASK; - if (WARN(flags & CLOSURE_GUARD_MASK, - "closure %ps has guard bits set: %x (%u)", + if (WARN(new & CLOSURE_GUARD_MASK, + "closure %ps has guard bits set: %x (%u), delta %i", cl->fn, - flags & CLOSURE_GUARD_MASK, (unsigned) __fls(r))) - r &= ~CLOSURE_GUARD_MASK; + new, (unsigned) __fls(new & CLOSURE_GUARD_MASK), d)) + new &= ~CLOSURE_GUARD_MASK; - WARN(!r && (flags & ~CLOSURE_DESTRUCTOR), + WARN(!count && (new & ~(CLOSURE_DESTRUCTOR|CLOSURE_SLEEPING)), "closure %ps ref hit 0 with incorrect flags set: %x (%u)", cl->fn, - flags & ~CLOSURE_DESTRUCTOR, (unsigned) __fls(flags)); + new, (unsigned) __fls(new)); } -static inline void closure_put_after_sub(struct closure *cl, int flags) +enum new_closure_state { + CLOSURE_normal_put, + CLOSURE_requeue, + CLOSURE_done, +}; + +/* For clearing flags with the same atomic op as a put */ +void closure_sub(struct closure *cl, int v) { - closure_put_after_sub_checks(cl, flags); + enum new_closure_state s; + struct task_struct *sleeper; - if (!(flags & CLOSURE_REMAINING_MASK)) { - smp_acquire__after_ctrl_dep(); + /* rcu_read_lock, atomic_read_acquire() are both for cl->sleeper: */ + guard(rcu)(); - cl->closure_get_happened = false; + int old = atomic_read_acquire(&cl->remaining), new; + do { + new = old - v; - if (cl->fn && !(flags & CLOSURE_DESTRUCTOR)) { - atomic_set(&cl->remaining, - CLOSURE_REMAINING_INITIALIZER); - closure_queue(cl); + if (new & CLOSURE_REMAINING_MASK) { + s = CLOSURE_normal_put; } else { - struct closure *parent = cl->parent; - closure_fn *destructor = cl->fn; + if ((cl->fn || (new & CLOSURE_SLEEPING)) && + !(new & CLOSURE_DESTRUCTOR)) { + s = CLOSURE_requeue; + new += CLOSURE_REMAINING_INITIALIZER; + } else + s = CLOSURE_done; - closure_debug_destroy(cl); + sleeper = new & CLOSURE_SLEEPING ? cl->sleeper : NULL; + new &= ~CLOSURE_SLEEPING; + } - if (destructor) - destructor(&cl->work); + closure_val_checks(cl, new, -v); + } while (!atomic_try_cmpxchg_release(&cl->remaining, &old, new)); - if (parent) - closure_put(parent); - } + if (s == CLOSURE_normal_put) + return; + + if (sleeper) { + smp_mb(); + wake_up_process(sleeper); + return; } -} -/* For clearing flags with the same atomic op as a put */ -void closure_sub(struct closure *cl, int v) -{ - closure_put_after_sub(cl, atomic_sub_return_release(v, &cl->remaining)); -} -EXPORT_SYMBOL(closure_sub); + if (s == CLOSURE_requeue) { + closure_queue(cl); + } else { + struct closure *parent = cl->parent; + closure_fn *destructor = cl->fn; -/* - * closure_put - decrement a closure's refcount - */ -void closure_put(struct closure *cl) -{ - closure_put_after_sub(cl, atomic_dec_return_release(&cl->remaining)); + closure_debug_destroy(cl); + + if (destructor) + destructor(&cl->work); + + if (parent) + closure_put(parent); + } } -EXPORT_SYMBOL(closure_put); +EXPORT_SYMBOL(closure_sub); /* * closure_wake_up - wake up all closures on a wait list, without memory barrier @@ -107,43 +125,26 @@ bool closure_wait(struct closure_waitlist *waitlist, struct closure *cl) if (atomic_read(&cl->remaining) & CLOSURE_WAITING) return false; - cl->closure_get_happened = true; closure_set_waiting(cl, _RET_IP_); - atomic_add(CLOSURE_WAITING + 1, &cl->remaining); + unsigned r = atomic_add_return(CLOSURE_WAITING + 1, &cl->remaining); + closure_val_checks(cl, r, CLOSURE_WAITING + 1); + llist_add(&cl->list, &waitlist->list); return true; } EXPORT_SYMBOL(closure_wait); -struct closure_syncer { - struct task_struct *task; - int done; -}; - -static CLOSURE_CALLBACK(closure_sync_fn) -{ - struct closure *cl = container_of(ws, struct closure, work); - struct closure_syncer *s = cl->s; - struct task_struct *p; - - rcu_read_lock(); - p = READ_ONCE(s->task); - s->done = 1; - wake_up_process(p); - rcu_read_unlock(); -} - void __sched __closure_sync(struct closure *cl) { - struct closure_syncer s = { .task = current }; - - cl->s = &s; - continue_at(cl, closure_sync_fn, NULL); + cl->sleeper = current; + closure_sub(cl, + CLOSURE_REMAINING_INITIALIZER - + CLOSURE_SLEEPING); while (1) { set_current_state(TASK_UNINTERRUPTIBLE); - if (s.done) + if (!(atomic_read(&cl->remaining) & CLOSURE_SLEEPING)) break; schedule(); } @@ -157,31 +158,25 @@ EXPORT_SYMBOL(__closure_sync); * for outstanding get()s to finish) and returning once closure refcount is 0. * * Unlike closure_sync() this doesn't reinit the ref to 1; subsequent - * closure_get_not_zero() calls waill fail. + * closure_get_not_zero() calls will fail. */ void __sched closure_return_sync(struct closure *cl) { - struct closure_syncer s = { .task = current }; - - cl->s = &s; - set_closure_fn(cl, closure_sync_fn, NULL); - - unsigned flags = atomic_sub_return_release(1 + CLOSURE_RUNNING - CLOSURE_DESTRUCTOR, - &cl->remaining); - - closure_put_after_sub_checks(cl, flags); - - if (unlikely(flags & CLOSURE_REMAINING_MASK)) { - while (1) { - set_current_state(TASK_UNINTERRUPTIBLE); - if (s.done) - break; - schedule(); - } + cl->sleeper = current; + closure_sub(cl, + CLOSURE_REMAINING_INITIALIZER - + CLOSURE_DESTRUCTOR - + CLOSURE_SLEEPING); - __set_current_state(TASK_RUNNING); + while (1) { + set_current_state(TASK_UNINTERRUPTIBLE); + if (!(atomic_read(&cl->remaining) & CLOSURE_SLEEPING)) + break; + schedule(); } + __set_current_state(TASK_RUNNING); + if (cl->parent) closure_put(cl->parent); } @@ -189,31 +184,35 @@ EXPORT_SYMBOL(closure_return_sync); int __sched __closure_sync_timeout(struct closure *cl, unsigned long timeout) { - struct closure_syncer s = { .task = current }; int ret = 0; - cl->s = &s; - continue_at(cl, closure_sync_fn, NULL); + cl->sleeper = current; + closure_sub(cl, + CLOSURE_REMAINING_INITIALIZER - + CLOSURE_SLEEPING); while (1) { set_current_state(TASK_UNINTERRUPTIBLE); - if (s.done) - break; + /* + * Carefully undo the continue_at() - but only if it + * hasn't completed, i.e. the final closure_put() hasn't + * happened yet: + */ + unsigned old = atomic_read(&cl->remaining), new; + if (!(old & CLOSURE_SLEEPING)) + goto success; + if (!timeout) { - /* - * Carefully undo the continue_at() - but only if it - * hasn't completed, i.e. the final closure_put() hasn't - * happened yet: - */ - unsigned old, new, v = atomic_read(&cl->remaining); do { - old = v; - if (!old || (old & CLOSURE_RUNNING)) + if (!(old & CLOSURE_SLEEPING)) goto success; - new = old + CLOSURE_REMAINING_INITIALIZER; - } while ((v = atomic_cmpxchg(&cl->remaining, old, new)) != old); + new = old + CLOSURE_REMAINING_INITIALIZER - CLOSURE_SLEEPING; + closure_val_checks(cl, new, CLOSURE_REMAINING_INITIALIZER - CLOSURE_SLEEPING); + } while (!atomic_try_cmpxchg(&cl->remaining, &old, new)); + ret = -ETIME; + break; } timeout = schedule_timeout(timeout); |