From ff35e5ef86fea1fa84eb7fdc939d0b1e3f1222bf Mon Sep 17 00:00:00 2001 From: Davidlohr Bueso Date: Tue, 30 Jun 2015 14:58:39 -0700 Subject: ipc,msg: provide barrier pairings for lockless receive We currently use a full barrier on the sender side to to avoid receiver tasks disappearing on us while still performing on the sender side wakeup. We lack however, the proper CPU-CPU interactions pairing on the receiver side which busy-waits for the message. Similarly, we do not need a full smp_mb, and can relax the semantics for the writer and reader sides of the message. This is safe as we are only ordering loads and stores to r_msg. And in both smp_wmb and smp_rmb, there are no stores after the calls _anyway_. This obviously applies for pipelined_send and expunge_all, for EIRDM when destroying a queue. Signed-off-by: Davidlohr Bueso Cc: Manfred Spraul Signed-off-by: Andrew Morton Signed-off-by: Linus Torvalds --- ipc/msg.c | 48 ++++++++++++++++++++++++++++++++++++++---------- 1 file changed, 38 insertions(+), 10 deletions(-) (limited to 'ipc/msg.c') diff --git a/ipc/msg.c b/ipc/msg.c index 2b6fdbb9e0e9..a9c3c519490a 100644 --- a/ipc/msg.c +++ b/ipc/msg.c @@ -196,7 +196,7 @@ static void expunge_all(struct msg_queue *msq, int res) * or dealing with -EAGAIN cases. See lockless receive part 1 * and 2 in do_msgrcv(). */ - smp_mb(); + smp_wmb(); /* barrier (B) */ msr->r_msg = ERR_PTR(res); } } @@ -580,7 +580,8 @@ static inline int pipelined_send(struct msg_queue *msq, struct msg_msg *msg) /* initialize pipelined send ordering */ msr->r_msg = NULL; wake_up_process(msr->r_tsk); - smp_mb(); /* see barrier comment below */ + /* barrier (B) see barrier comment below */ + smp_wmb(); msr->r_msg = ERR_PTR(-E2BIG); } else { msr->r_msg = NULL; @@ -589,11 +590,12 @@ static inline int pipelined_send(struct msg_queue *msq, struct msg_msg *msg) wake_up_process(msr->r_tsk); /* * Ensure that the wakeup is visible before - * setting r_msg, as the receiving end depends - * on it. See lockless receive part 1 and 2 in - * do_msgrcv(). + * setting r_msg, as the receiving can otherwise + * exit - once r_msg is set, the receiver can + * continue. See lockless receive part 1 and 2 + * in do_msgrcv(). Barrier (B). */ - smp_mb(); + smp_wmb(); msr->r_msg = msg; return 1; @@ -932,12 +934,38 @@ long do_msgrcv(int msqid, void __user *buf, size_t bufsz, long msgtyp, int msgfl /* Lockless receive, part 2: * Wait until pipelined_send or expunge_all are outside of * wake_up_process(). There is a race with exit(), see - * ipc/mqueue.c for the details. + * ipc/mqueue.c for the details. The correct serialization + * ensures that a receiver cannot continue without the wakeup + * being visibible _before_ setting r_msg: + * + * CPU 0 CPU 1 + * + * smp_rmb(); (A) <-- pair -. + * r_msg> | msr->r_msg = NULL; + * | wake_up_process(); + * `------> smp_wmb(); (B) + * msr->r_msg = msg; + * + * Where (A) orders the message value read and where (B) orders + * the write to the r_msg -- done in both pipelined_send and + * expunge_all. */ - msg = (struct msg_msg *)msr_d.r_msg; - while (msg == NULL) { - cpu_relax(); + for (;;) { + /* + * Pairs with writer barrier in pipelined_send + * or expunge_all. + */ + smp_rmb(); /* barrier (A) */ msg = (struct msg_msg *)msr_d.r_msg; + if (msg) + break; + + /* + * The cpu_relax() call is a compiler barrier + * which forces everything in this loop to be + * re-loaded. + */ + cpu_relax(); } /* Lockless receive, part 3: -- cgit v1.2.3 From 55b7ae50167efc9b1c4f8fb60a99478cd46a82f7 Mon Sep 17 00:00:00 2001 From: Davidlohr Bueso Date: Tue, 30 Jun 2015 14:58:42 -0700 Subject: ipc: rename ipc_obtain_object ... to ipc_obtain_object_idr, which is more meaningful and makes the code slightly easier to follow. Signed-off-by: Davidlohr Bueso Cc: Manfred Spraul Signed-off-by: Andrew Morton Signed-off-by: Linus Torvalds --- ipc/msg.c | 2 +- ipc/sem.c | 4 ++-- ipc/shm.c | 2 +- ipc/util.c | 8 ++++---- ipc/util.h | 2 +- 5 files changed, 9 insertions(+), 9 deletions(-) (limited to 'ipc/msg.c') diff --git a/ipc/msg.c b/ipc/msg.c index a9c3c519490a..66c4f567eb73 100644 --- a/ipc/msg.c +++ b/ipc/msg.c @@ -76,7 +76,7 @@ struct msg_sender { static inline struct msg_queue *msq_obtain_object(struct ipc_namespace *ns, int id) { - struct kern_ipc_perm *ipcp = ipc_obtain_object(&msg_ids(ns), id); + struct kern_ipc_perm *ipcp = ipc_obtain_object_idr(&msg_ids(ns), id); if (IS_ERR(ipcp)) return ERR_CAST(ipcp); diff --git a/ipc/sem.c b/ipc/sem.c index d1a6edd17eba..bc3d530cb23e 100644 --- a/ipc/sem.c +++ b/ipc/sem.c @@ -391,7 +391,7 @@ static inline struct sem_array *sem_obtain_lock(struct ipc_namespace *ns, struct kern_ipc_perm *ipcp; struct sem_array *sma; - ipcp = ipc_obtain_object(&sem_ids(ns), id); + ipcp = ipc_obtain_object_idr(&sem_ids(ns), id); if (IS_ERR(ipcp)) return ERR_CAST(ipcp); @@ -410,7 +410,7 @@ static inline struct sem_array *sem_obtain_lock(struct ipc_namespace *ns, static inline struct sem_array *sem_obtain_object(struct ipc_namespace *ns, int id) { - struct kern_ipc_perm *ipcp = ipc_obtain_object(&sem_ids(ns), id); + struct kern_ipc_perm *ipcp = ipc_obtain_object_idr(&sem_ids(ns), id); if (IS_ERR(ipcp)) return ERR_CAST(ipcp); diff --git a/ipc/shm.c b/ipc/shm.c index bd9c91f433c1..06e5cf2fe019 100644 --- a/ipc/shm.c +++ b/ipc/shm.c @@ -129,7 +129,7 @@ void __init shm_init(void) static inline struct shmid_kernel *shm_obtain_object(struct ipc_namespace *ns, int id) { - struct kern_ipc_perm *ipcp = ipc_obtain_object(&shm_ids(ns), id); + struct kern_ipc_perm *ipcp = ipc_obtain_object_idr(&shm_ids(ns), id); if (IS_ERR(ipcp)) return ERR_CAST(ipcp); diff --git a/ipc/util.c b/ipc/util.c index 537a41c7f633..3fdfabfdd9c3 100644 --- a/ipc/util.c +++ b/ipc/util.c @@ -555,7 +555,7 @@ void ipc64_perm_to_ipc_perm(struct ipc64_perm *in, struct ipc_perm *out) * Call inside the RCU critical section. * The ipc object is *not* locked on exit. */ -struct kern_ipc_perm *ipc_obtain_object(struct ipc_ids *ids, int id) +struct kern_ipc_perm *ipc_obtain_object_idr(struct ipc_ids *ids, int id) { struct kern_ipc_perm *out; int lid = ipcid_to_idx(id); @@ -581,7 +581,7 @@ struct kern_ipc_perm *ipc_lock(struct ipc_ids *ids, int id) struct kern_ipc_perm *out; rcu_read_lock(); - out = ipc_obtain_object(ids, id); + out = ipc_obtain_object_idr(ids, id); if (IS_ERR(out)) goto err1; @@ -605,7 +605,7 @@ err1: * @ids: ipc identifier set * @id: ipc id to look for * - * Similar to ipc_obtain_object() but also checks + * Similar to ipc_obtain_object_idr() but also checks * the ipc object reference counter. * * Call inside the RCU critical section. @@ -613,7 +613,7 @@ err1: */ struct kern_ipc_perm *ipc_obtain_object_check(struct ipc_ids *ids, int id) { - struct kern_ipc_perm *out = ipc_obtain_object(ids, id); + struct kern_ipc_perm *out = ipc_obtain_object_idr(ids, id); if (IS_ERR(out)) goto out; diff --git a/ipc/util.h b/ipc/util.h index 1a5a0fcd099c..3a8a5a0eca62 100644 --- a/ipc/util.h +++ b/ipc/util.h @@ -132,7 +132,7 @@ void ipc_rcu_putref(void *ptr, void (*func)(struct rcu_head *head)); void ipc_rcu_free(struct rcu_head *head); struct kern_ipc_perm *ipc_lock(struct ipc_ids *, int); -struct kern_ipc_perm *ipc_obtain_object(struct ipc_ids *ids, int id); +struct kern_ipc_perm *ipc_obtain_object_idr(struct ipc_ids *ids, int id); void kernel_to_ipc64_perm(struct kern_ipc_perm *in, struct ipc64_perm *out); void ipc64_perm_to_ipc_perm(struct ipc64_perm *in, struct ipc_perm *out); -- cgit v1.2.3