summaryrefslogtreecommitdiff
path: root/net/sunrpc
diff options
context:
space:
mode:
Diffstat (limited to 'net/sunrpc')
-rw-r--r--net/sunrpc/Makefile3
-rw-r--r--net/sunrpc/auth.c9
-rw-r--r--net/sunrpc/auth_generic.c13
-rw-r--r--net/sunrpc/auth_gss/auth_gss.c18
-rw-r--r--net/sunrpc/auth_gss/gss_krb5_crypto.c356
-rw-r--r--net/sunrpc/auth_gss/gss_krb5_keys.c12
-rw-r--r--net/sunrpc/auth_gss/gss_krb5_mech.c90
-rw-r--r--net/sunrpc/auth_gss/gss_krb5_seqnum.c22
-rw-r--r--net/sunrpc/auth_gss/gss_krb5_wrap.c28
-rw-r--r--net/sunrpc/auth_gss/svcauth_gss.c9
-rw-r--r--net/sunrpc/auth_null.c4
-rw-r--r--net/sunrpc/auth_unix.c12
-rw-r--r--net/sunrpc/cache.c10
-rw-r--r--net/sunrpc/clnt.c370
-rw-r--r--net/sunrpc/rpc_pipe.c4
-rw-r--r--net/sunrpc/rpcb_clnt.c10
-rw-r--r--net/sunrpc/sched.c2
-rw-r--r--net/sunrpc/socklib.c8
-rw-r--r--net/sunrpc/svc_xprt.c25
-rw-r--r--net/sunrpc/svcsock.c8
-rw-r--r--net/sunrpc/xdr.c52
-rw-r--r--net/sunrpc/xprt.c42
-rw-r--r--net/sunrpc/xprtmultipath.c475
-rw-r--r--net/sunrpc/xprtrdma/backchannel.c16
-rw-r--r--net/sunrpc/xprtrdma/fmr_ops.c160
-rw-r--r--net/sunrpc/xprtrdma/frwr_ops.c317
-rw-r--r--net/sunrpc/xprtrdma/physical_ops.c40
-rw-r--r--net/sunrpc/xprtrdma/rpc_rdma.c625
-rw-r--r--net/sunrpc/xprtrdma/svc_rdma_backchannel.c17
-rw-r--r--net/sunrpc/xprtrdma/svc_rdma_marshal.c88
-rw-r--r--net/sunrpc/xprtrdma/svc_rdma_recvfrom.c94
-rw-r--r--net/sunrpc/xprtrdma/svc_rdma_sendto.c220
-rw-r--r--net/sunrpc/xprtrdma/svc_rdma_transport.c462
-rw-r--r--net/sunrpc/xprtrdma/transport.c16
-rw-r--r--net/sunrpc/xprtrdma/verbs.c280
-rw-r--r--net/sunrpc/xprtrdma/xprt_rdma.h61
-rw-r--r--net/sunrpc/xprtsock.c26
37 files changed, 2479 insertions, 1525 deletions
diff --git a/net/sunrpc/Makefile b/net/sunrpc/Makefile
index b512fbd9d79a..ea7ffa12e0f9 100644
--- a/net/sunrpc/Makefile
+++ b/net/sunrpc/Makefile
@@ -12,7 +12,8 @@ sunrpc-y := clnt.o xprt.o socklib.o xprtsock.o sched.o \
svc.o svcsock.o svcauth.o svcauth_unix.o \
addr.o rpcb_clnt.o timer.o xdr.o \
sunrpc_syms.o cache.o rpc_pipe.o \
- svc_xprt.o
+ svc_xprt.o \
+ xprtmultipath.o
sunrpc-$(CONFIG_SUNRPC_DEBUG) += debugfs.o
sunrpc-$(CONFIG_SUNRPC_BACKCHANNEL) += backchannel_rqst.o
sunrpc-$(CONFIG_PROC_FS) += stats.o
diff --git a/net/sunrpc/auth.c b/net/sunrpc/auth.c
index 02f53674dc39..040ff627c18a 100644
--- a/net/sunrpc/auth.c
+++ b/net/sunrpc/auth.c
@@ -543,7 +543,7 @@ rpcauth_cache_enforce_limit(void)
*/
struct rpc_cred *
rpcauth_lookup_credcache(struct rpc_auth *auth, struct auth_cred * acred,
- int flags)
+ int flags, gfp_t gfp)
{
LIST_HEAD(free);
struct rpc_cred_cache *cache = auth->au_credcache;
@@ -580,7 +580,7 @@ rpcauth_lookup_credcache(struct rpc_auth *auth, struct auth_cred * acred,
if (flags & RPCAUTH_LOOKUP_RCU)
return ERR_PTR(-ECHILD);
- new = auth->au_ops->crcreate(auth, acred, flags);
+ new = auth->au_ops->crcreate(auth, acred, flags, gfp);
if (IS_ERR(new)) {
cred = new;
goto out;
@@ -703,8 +703,7 @@ rpcauth_bindcred(struct rpc_task *task, struct rpc_cred *cred, int flags)
new = rpcauth_bind_new_cred(task, lookupflags);
if (IS_ERR(new))
return PTR_ERR(new);
- if (req->rq_cred != NULL)
- put_rpccred(req->rq_cred);
+ put_rpccred(req->rq_cred);
req->rq_cred = new;
return 0;
}
@@ -712,6 +711,8 @@ rpcauth_bindcred(struct rpc_task *task, struct rpc_cred *cred, int flags)
void
put_rpccred(struct rpc_cred *cred)
{
+ if (cred == NULL)
+ return;
/* Fast path for unhashed credentials */
if (test_bit(RPCAUTH_CRED_HASHED, &cred->cr_flags) == 0) {
if (atomic_dec_and_test(&cred->cr_count))
diff --git a/net/sunrpc/auth_generic.c b/net/sunrpc/auth_generic.c
index 41248b1820c7..54dd3fdead54 100644
--- a/net/sunrpc/auth_generic.c
+++ b/net/sunrpc/auth_generic.c
@@ -38,6 +38,13 @@ struct rpc_cred *rpc_lookup_cred(void)
}
EXPORT_SYMBOL_GPL(rpc_lookup_cred);
+struct rpc_cred *
+rpc_lookup_generic_cred(struct auth_cred *acred, int flags, gfp_t gfp)
+{
+ return rpcauth_lookup_credcache(&generic_auth, acred, flags, gfp);
+}
+EXPORT_SYMBOL_GPL(rpc_lookup_generic_cred);
+
struct rpc_cred *rpc_lookup_cred_nonblock(void)
{
return rpcauth_lookupcred(&generic_auth, RPCAUTH_LOOKUP_RCU);
@@ -77,15 +84,15 @@ static struct rpc_cred *generic_bind_cred(struct rpc_task *task,
static struct rpc_cred *
generic_lookup_cred(struct rpc_auth *auth, struct auth_cred *acred, int flags)
{
- return rpcauth_lookup_credcache(&generic_auth, acred, flags);
+ return rpcauth_lookup_credcache(&generic_auth, acred, flags, GFP_KERNEL);
}
static struct rpc_cred *
-generic_create_cred(struct rpc_auth *auth, struct auth_cred *acred, int flags)
+generic_create_cred(struct rpc_auth *auth, struct auth_cred *acred, int flags, gfp_t gfp)
{
struct generic_cred *gcred;
- gcred = kmalloc(sizeof(*gcred), GFP_KERNEL);
+ gcred = kmalloc(sizeof(*gcred), gfp);
if (gcred == NULL)
return ERR_PTR(-ENOMEM);
diff --git a/net/sunrpc/auth_gss/auth_gss.c b/net/sunrpc/auth_gss/auth_gss.c
index cabf586f47d7..e64ae93d5b4f 100644
--- a/net/sunrpc/auth_gss/auth_gss.c
+++ b/net/sunrpc/auth_gss/auth_gss.c
@@ -1181,12 +1181,12 @@ static struct rpc_auth *
gss_create(struct rpc_auth_create_args *args, struct rpc_clnt *clnt)
{
struct gss_auth *gss_auth;
- struct rpc_xprt *xprt = rcu_access_pointer(clnt->cl_xprt);
+ struct rpc_xprt_switch *xps = rcu_access_pointer(clnt->cl_xpi.xpi_xpswitch);
while (clnt != clnt->cl_parent) {
struct rpc_clnt *parent = clnt->cl_parent;
/* Find the original parent for this transport */
- if (rcu_access_pointer(parent->cl_xprt) != xprt)
+ if (rcu_access_pointer(parent->cl_xpi.xpi_xpswitch) != xps)
break;
clnt = parent;
}
@@ -1299,11 +1299,11 @@ gss_destroy_cred(struct rpc_cred *cred)
static struct rpc_cred *
gss_lookup_cred(struct rpc_auth *auth, struct auth_cred *acred, int flags)
{
- return rpcauth_lookup_credcache(auth, acred, flags);
+ return rpcauth_lookup_credcache(auth, acred, flags, GFP_NOFS);
}
static struct rpc_cred *
-gss_create_cred(struct rpc_auth *auth, struct auth_cred *acred, int flags)
+gss_create_cred(struct rpc_auth *auth, struct auth_cred *acred, int flags, gfp_t gfp)
{
struct gss_auth *gss_auth = container_of(auth, struct gss_auth, rpc_auth);
struct gss_cred *cred = NULL;
@@ -1313,7 +1313,7 @@ gss_create_cred(struct rpc_auth *auth, struct auth_cred *acred, int flags)
__func__, from_kuid(&init_user_ns, acred->uid),
auth->au_flavor);
- if (!(cred = kzalloc(sizeof(*cred), GFP_NOFS)))
+ if (!(cred = kzalloc(sizeof(*cred), gfp)))
goto out_err;
rpcauth_init_cred(&cred->gc_base, acred, auth, &gss_credops);
@@ -1728,8 +1728,8 @@ alloc_enc_pages(struct rpc_rqst *rqstp)
return 0;
}
- first = snd_buf->page_base >> PAGE_CACHE_SHIFT;
- last = (snd_buf->page_base + snd_buf->page_len - 1) >> PAGE_CACHE_SHIFT;
+ first = snd_buf->page_base >> PAGE_SHIFT;
+ last = (snd_buf->page_base + snd_buf->page_len - 1) >> PAGE_SHIFT;
rqstp->rq_enc_pages_num = last - first + 1 + 1;
rqstp->rq_enc_pages
= kmalloc(rqstp->rq_enc_pages_num * sizeof(struct page *),
@@ -1775,10 +1775,10 @@ gss_wrap_req_priv(struct rpc_cred *cred, struct gss_cl_ctx *ctx,
status = alloc_enc_pages(rqstp);
if (status)
return status;
- first = snd_buf->page_base >> PAGE_CACHE_SHIFT;
+ first = snd_buf->page_base >> PAGE_SHIFT;
inpages = snd_buf->pages + first;
snd_buf->pages = rqstp->rq_enc_pages;
- snd_buf->page_base -= first << PAGE_CACHE_SHIFT;
+ snd_buf->page_base -= first << PAGE_SHIFT;
/*
* Give the tail its own page, in case we need extra space in the
* head when wrapping:
diff --git a/net/sunrpc/auth_gss/gss_krb5_crypto.c b/net/sunrpc/auth_gss/gss_krb5_crypto.c
index fee3c15a4b52..244245bcbbd2 100644
--- a/net/sunrpc/auth_gss/gss_krb5_crypto.c
+++ b/net/sunrpc/auth_gss/gss_krb5_crypto.c
@@ -34,11 +34,12 @@
* WARRANTIES OF MERCHANTIBILITY AND FITNESS FOR A PARTICULAR PURPOSE.
*/
+#include <crypto/hash.h>
+#include <crypto/skcipher.h>
#include <linux/err.h>
#include <linux/types.h>
#include <linux/mm.h>
#include <linux/scatterlist.h>
-#include <linux/crypto.h>
#include <linux/highmem.h>
#include <linux/pagemap.h>
#include <linux/random.h>
@@ -51,7 +52,7 @@
u32
krb5_encrypt(
- struct crypto_blkcipher *tfm,
+ struct crypto_skcipher *tfm,
void * iv,
void * in,
void * out,
@@ -60,24 +61,29 @@ krb5_encrypt(
u32 ret = -EINVAL;
struct scatterlist sg[1];
u8 local_iv[GSS_KRB5_MAX_BLOCKSIZE] = {0};
- struct blkcipher_desc desc = { .tfm = tfm, .info = local_iv };
+ SKCIPHER_REQUEST_ON_STACK(req, tfm);
- if (length % crypto_blkcipher_blocksize(tfm) != 0)
+ if (length % crypto_skcipher_blocksize(tfm) != 0)
goto out;
- if (crypto_blkcipher_ivsize(tfm) > GSS_KRB5_MAX_BLOCKSIZE) {
+ if (crypto_skcipher_ivsize(tfm) > GSS_KRB5_MAX_BLOCKSIZE) {
dprintk("RPC: gss_k5encrypt: tfm iv size too large %d\n",
- crypto_blkcipher_ivsize(tfm));
+ crypto_skcipher_ivsize(tfm));
goto out;
}
if (iv)
- memcpy(local_iv, iv, crypto_blkcipher_ivsize(tfm));
+ memcpy(local_iv, iv, crypto_skcipher_ivsize(tfm));
memcpy(out, in, length);
sg_init_one(sg, out, length);
- ret = crypto_blkcipher_encrypt_iv(&desc, sg, sg, length);
+ skcipher_request_set_tfm(req, tfm);
+ skcipher_request_set_callback(req, 0, NULL, NULL);
+ skcipher_request_set_crypt(req, sg, sg, length, local_iv);
+
+ ret = crypto_skcipher_encrypt(req);
+ skcipher_request_zero(req);
out:
dprintk("RPC: krb5_encrypt returns %d\n", ret);
return ret;
@@ -85,7 +91,7 @@ out:
u32
krb5_decrypt(
- struct crypto_blkcipher *tfm,
+ struct crypto_skcipher *tfm,
void * iv,
void * in,
void * out,
@@ -94,23 +100,28 @@ krb5_decrypt(
u32 ret = -EINVAL;
struct scatterlist sg[1];
u8 local_iv[GSS_KRB5_MAX_BLOCKSIZE] = {0};
- struct blkcipher_desc desc = { .tfm = tfm, .info = local_iv };
+ SKCIPHER_REQUEST_ON_STACK(req, tfm);
- if (length % crypto_blkcipher_blocksize(tfm) != 0)
+ if (length % crypto_skcipher_blocksize(tfm) != 0)
goto out;
- if (crypto_blkcipher_ivsize(tfm) > GSS_KRB5_MAX_BLOCKSIZE) {
+ if (crypto_skcipher_ivsize(tfm) > GSS_KRB5_MAX_BLOCKSIZE) {
dprintk("RPC: gss_k5decrypt: tfm iv size too large %d\n",
- crypto_blkcipher_ivsize(tfm));
+ crypto_skcipher_ivsize(tfm));
goto out;
}
if (iv)
- memcpy(local_iv,iv, crypto_blkcipher_ivsize(tfm));
+ memcpy(local_iv,iv, crypto_skcipher_ivsize(tfm));
memcpy(out, in, length);
sg_init_one(sg, out, length);
- ret = crypto_blkcipher_decrypt_iv(&desc, sg, sg, length);
+ skcipher_request_set_tfm(req, tfm);
+ skcipher_request_set_callback(req, 0, NULL, NULL);
+ skcipher_request_set_crypt(req, sg, sg, length, local_iv);
+
+ ret = crypto_skcipher_decrypt(req);
+ skcipher_request_zero(req);
out:
dprintk("RPC: gss_k5decrypt returns %d\n",ret);
return ret;
@@ -119,9 +130,11 @@ out:
static int
checksummer(struct scatterlist *sg, void *data)
{
- struct hash_desc *desc = data;
+ struct ahash_request *req = data;
+
+ ahash_request_set_crypt(req, sg, NULL, sg->length);
- return crypto_hash_update(desc, sg, sg->length);
+ return crypto_ahash_update(req);
}
static int
@@ -152,13 +165,13 @@ make_checksum_hmac_md5(struct krb5_ctx *kctx, char *header, int hdrlen,
struct xdr_buf *body, int body_offset, u8 *cksumkey,
unsigned int usage, struct xdr_netobj *cksumout)
{
- struct hash_desc desc;
struct scatterlist sg[1];
int err;
u8 checksumdata[GSS_KRB5_MAX_CKSUM_LEN];
u8 rc4salt[4];
- struct crypto_hash *md5;
- struct crypto_hash *hmac_md5;
+ struct crypto_ahash *md5;
+ struct crypto_ahash *hmac_md5;
+ struct ahash_request *req;
if (cksumkey == NULL)
return GSS_S_FAILURE;
@@ -174,61 +187,79 @@ make_checksum_hmac_md5(struct krb5_ctx *kctx, char *header, int hdrlen,
return GSS_S_FAILURE;
}
- md5 = crypto_alloc_hash("md5", 0, CRYPTO_ALG_ASYNC);
+ md5 = crypto_alloc_ahash("md5", 0, CRYPTO_ALG_ASYNC);
if (IS_ERR(md5))
return GSS_S_FAILURE;
- hmac_md5 = crypto_alloc_hash(kctx->gk5e->cksum_name, 0,
- CRYPTO_ALG_ASYNC);
+ hmac_md5 = crypto_alloc_ahash(kctx->gk5e->cksum_name, 0,
+ CRYPTO_ALG_ASYNC);
if (IS_ERR(hmac_md5)) {
- crypto_free_hash(md5);
+ crypto_free_ahash(md5);
+ return GSS_S_FAILURE;
+ }
+
+ req = ahash_request_alloc(md5, GFP_KERNEL);
+ if (!req) {
+ crypto_free_ahash(hmac_md5);
+ crypto_free_ahash(md5);
return GSS_S_FAILURE;
}
- desc.tfm = md5;
- desc.flags = CRYPTO_TFM_REQ_MAY_SLEEP;
+ ahash_request_set_callback(req, CRYPTO_TFM_REQ_MAY_SLEEP, NULL, NULL);
- err = crypto_hash_init(&desc);
+ err = crypto_ahash_init(req);
if (err)
goto out;
sg_init_one(sg, rc4salt, 4);
- err = crypto_hash_update(&desc, sg, 4);
+ ahash_request_set_crypt(req, sg, NULL, 4);
+ err = crypto_ahash_update(req);
if (err)
goto out;
sg_init_one(sg, header, hdrlen);
- err = crypto_hash_update(&desc, sg, hdrlen);
+ ahash_request_set_crypt(req, sg, NULL, hdrlen);
+ err = crypto_ahash_update(req);
if (err)
goto out;
err = xdr_process_buf(body, body_offset, body->len - body_offset,
- checksummer, &desc);
+ checksummer, req);
if (err)
goto out;
- err = crypto_hash_final(&desc, checksumdata);
+ ahash_request_set_crypt(req, NULL, checksumdata, 0);
+ err = crypto_ahash_final(req);
if (err)
goto out;
- desc.tfm = hmac_md5;
- desc.flags = CRYPTO_TFM_REQ_MAY_SLEEP;
+ ahash_request_free(req);
+ req = ahash_request_alloc(hmac_md5, GFP_KERNEL);
+ if (!req) {
+ crypto_free_ahash(hmac_md5);
+ crypto_free_ahash(md5);
+ return GSS_S_FAILURE;
+ }
+
+ ahash_request_set_callback(req, CRYPTO_TFM_REQ_MAY_SLEEP, NULL, NULL);
- err = crypto_hash_init(&desc);
+ err = crypto_ahash_init(req);
if (err)
goto out;
- err = crypto_hash_setkey(hmac_md5, cksumkey, kctx->gk5e->keylength);
+ err = crypto_ahash_setkey(hmac_md5, cksumkey, kctx->gk5e->keylength);
if (err)
goto out;
- sg_init_one(sg, checksumdata, crypto_hash_digestsize(md5));
- err = crypto_hash_digest(&desc, sg, crypto_hash_digestsize(md5),
- checksumdata);
+ sg_init_one(sg, checksumdata, crypto_ahash_digestsize(md5));
+ ahash_request_set_crypt(req, sg, checksumdata,
+ crypto_ahash_digestsize(md5));
+ err = crypto_ahash_digest(req);
if (err)
goto out;
memcpy(cksumout->data, checksumdata, kctx->gk5e->cksumlength);
cksumout->len = kctx->gk5e->cksumlength;
out:
- crypto_free_hash(md5);
- crypto_free_hash(hmac_md5);
+ ahash_request_free(req);
+ crypto_free_ahash(md5);
+ crypto_free_ahash(hmac_md5);
return err ? GSS_S_FAILURE : 0;
}
@@ -242,7 +273,8 @@ make_checksum(struct krb5_ctx *kctx, char *header, int hdrlen,
struct xdr_buf *body, int body_offset, u8 *cksumkey,
unsigned int usage, struct xdr_netobj *cksumout)
{
- struct hash_desc desc;
+ struct crypto_ahash *tfm;
+ struct ahash_request *req;
struct scatterlist sg[1];
int err;
u8 checksumdata[GSS_KRB5_MAX_CKSUM_LEN];
@@ -259,32 +291,41 @@ make_checksum(struct krb5_ctx *kctx, char *header, int hdrlen,
return GSS_S_FAILURE;
}
- desc.tfm = crypto_alloc_hash(kctx->gk5e->cksum_name, 0, CRYPTO_ALG_ASYNC);
- if (IS_ERR(desc.tfm))
+ tfm = crypto_alloc_ahash(kctx->gk5e->cksum_name, 0, CRYPTO_ALG_ASYNC);
+ if (IS_ERR(tfm))
return GSS_S_FAILURE;
- desc.flags = CRYPTO_TFM_REQ_MAY_SLEEP;
- checksumlen = crypto_hash_digestsize(desc.tfm);
+ req = ahash_request_alloc(tfm, GFP_KERNEL);
+ if (!req) {
+ crypto_free_ahash(tfm);
+ return GSS_S_FAILURE;
+ }
+
+ ahash_request_set_callback(req, CRYPTO_TFM_REQ_MAY_SLEEP, NULL, NULL);
+
+ checksumlen = crypto_ahash_digestsize(tfm);
if (cksumkey != NULL) {
- err = crypto_hash_setkey(desc.tfm, cksumkey,
- kctx->gk5e->keylength);
+ err = crypto_ahash_setkey(tfm, cksumkey,
+ kctx->gk5e->keylength);
if (err)
goto out;
}
- err = crypto_hash_init(&desc);
+ err = crypto_ahash_init(req);
if (err)
goto out;
sg_init_one(sg, header, hdrlen);
- err = crypto_hash_update(&desc, sg, hdrlen);
+ ahash_request_set_crypt(req, sg, NULL, hdrlen);
+ err = crypto_ahash_update(req);
if (err)
goto out;
err = xdr_process_buf(body, body_offset, body->len - body_offset,
- checksummer, &desc);
+ checksummer, req);
if (err)
goto out;
- err = crypto_hash_final(&desc, checksumdata);
+ ahash_request_set_crypt(req, NULL, checksumdata, 0);
+ err = crypto_ahash_final(req);
if (err)
goto out;
@@ -307,7 +348,8 @@ make_checksum(struct krb5_ctx *kctx, char *header, int hdrlen,
}
cksumout->len = kctx->gk5e->cksumlength;
out:
- crypto_free_hash(desc.tfm);
+ ahash_request_free(req);
+ crypto_free_ahash(tfm);
return err ? GSS_S_FAILURE : 0;
}
@@ -323,7 +365,8 @@ make_checksum_v2(struct krb5_ctx *kctx, char *header, int hdrlen,
struct xdr_buf *body, int body_offset, u8 *cksumkey,
unsigned int usage, struct xdr_netobj *cksumout)
{
- struct hash_desc desc;
+ struct crypto_ahash *tfm;
+ struct ahash_request *req;
struct scatterlist sg[1];
int err;
u8 checksumdata[GSS_KRB5_MAX_CKSUM_LEN];
@@ -340,31 +383,39 @@ make_checksum_v2(struct krb5_ctx *kctx, char *header, int hdrlen,
return GSS_S_FAILURE;
}
- desc.tfm = crypto_alloc_hash(kctx->gk5e->cksum_name, 0,
- CRYPTO_ALG_ASYNC);
- if (IS_ERR(desc.tfm))
+ tfm = crypto_alloc_ahash(kctx->gk5e->cksum_name, 0, CRYPTO_ALG_ASYNC);
+ if (IS_ERR(tfm))
return GSS_S_FAILURE;
- checksumlen = crypto_hash_digestsize(desc.tfm);
- desc.flags = CRYPTO_TFM_REQ_MAY_SLEEP;
+ checksumlen = crypto_ahash_digestsize(tfm);
+
+ req = ahash_request_alloc(tfm, GFP_KERNEL);
+ if (!req) {
+ crypto_free_ahash(tfm);
+ return GSS_S_FAILURE;
+ }
- err = crypto_hash_setkey(desc.tfm, cksumkey, kctx->gk5e->keylength);
+ ahash_request_set_callback(req, CRYPTO_TFM_REQ_MAY_SLEEP, NULL, NULL);
+
+ err = crypto_ahash_setkey(tfm, cksumkey, kctx->gk5e->keylength);
if (err)
goto out;
- err = crypto_hash_init(&desc);
+ err = crypto_ahash_init(req);
if (err)
goto out;
err = xdr_process_buf(body, body_offset, body->len - body_offset,
- checksummer, &desc);
+ checksummer, req);
if (err)
goto out;
if (header != NULL) {
sg_init_one(sg, header, hdrlen);
- err = crypto_hash_update(&desc, sg, hdrlen);
+ ahash_request_set_crypt(req, sg, NULL, hdrlen);
+ err = crypto_ahash_update(req);
if (err)
goto out;
}
- err = crypto_hash_final(&desc, checksumdata);
+ ahash_request_set_crypt(req, NULL, checksumdata, 0);
+ err = crypto_ahash_final(req);
if (err)
goto out;
@@ -381,13 +432,14 @@ make_checksum_v2(struct krb5_ctx *kctx, char *header, int hdrlen,
break;
}
out:
- crypto_free_hash(desc.tfm);
+ ahash_request_free(req);
+ crypto_free_ahash(tfm);
return err ? GSS_S_FAILURE : 0;
}
struct encryptor_desc {
u8 iv[GSS_KRB5_MAX_BLOCKSIZE];
- struct blkcipher_desc desc;
+ struct skcipher_request *req;
int pos;
struct xdr_buf *outbuf;
struct page **pages;
@@ -402,6 +454,7 @@ encryptor(struct scatterlist *sg, void *data)
{
struct encryptor_desc *desc = data;
struct xdr_buf *outbuf = desc->outbuf;
+ struct crypto_skcipher *tfm = crypto_skcipher_reqtfm(desc->req);
struct page *in_page;
int thislen = desc->fraglen + sg->length;
int fraglen, ret;
@@ -414,7 +467,7 @@ encryptor(struct scatterlist *sg, void *data)
page_pos = desc->pos - outbuf->head[0].iov_len;
if (page_pos >= 0 && page_pos < outbuf->page_len) {
/* pages are not in place: */
- int i = (page_pos + outbuf->page_base) >> PAGE_CACHE_SHIFT;
+ int i = (page_pos + outbuf->page_base) >> PAGE_SHIFT;
in_page = desc->pages[i];
} else {
in_page = sg_page(sg);
@@ -427,7 +480,7 @@ encryptor(struct scatterlist *sg, void *data)
desc->fraglen += sg->length;
desc->pos += sg->length;
- fraglen = thislen & (crypto_blkcipher_blocksize(desc->desc.tfm) - 1);
+ fraglen = thislen & (crypto_skcipher_blocksize(tfm) - 1);
thislen -= fraglen;
if (thislen == 0)
@@ -436,8 +489,10 @@ encryptor(struct scatterlist *sg, void *data)
sg_mark_end(&desc->infrags[desc->fragno - 1]);
sg_mark_end(&desc->outfrags[desc->fragno - 1]);
- ret = crypto_blkcipher_encrypt_iv(&desc->desc, desc->outfrags,
- desc->infrags, thislen);
+ skcipher_request_set_crypt(desc->req, desc->infrags, desc->outfrags,
+ thislen, desc->iv);
+
+ ret = crypto_skcipher_encrypt(desc->req);
if (ret)
return ret;
@@ -459,18 +514,20 @@ encryptor(struct scatterlist *sg, void *data)
}
int
-gss_encrypt_xdr_buf(struct crypto_blkcipher *tfm, struct xdr_buf *buf,
+gss_encrypt_xdr_buf(struct crypto_skcipher *tfm, struct xdr_buf *buf,
int offset, struct page **pages)
{
int ret;
struct encryptor_desc desc;
+ SKCIPHER_REQUEST_ON_STACK(req, tfm);
+
+ BUG_ON((buf->len - offset) % crypto_skcipher_blocksize(tfm) != 0);
- BUG_ON((buf->len - offset) % crypto_blkcipher_blocksize(tfm) != 0);
+ skcipher_request_set_tfm(req, tfm);
+ skcipher_request_set_callback(req, 0, NULL, NULL);
memset(desc.iv, 0, sizeof(desc.iv));
- desc.desc.tfm = tfm;
- desc.desc.info = desc.iv;
- desc.desc.flags = 0;
+ desc.req = req;
desc.pos = offset;
desc.outbuf = buf;
desc.pages = pages;
@@ -481,12 +538,13 @@ gss_encrypt_xdr_buf(struct crypto_blkcipher *tfm, struct xdr_buf *buf,
sg_init_table(desc.outfrags, 4);
ret = xdr_process_buf(buf, offset, buf->len - offset, encryptor, &desc);
+ skcipher_request_zero(req);
return ret;
}
struct decryptor_desc {
u8 iv[GSS_KRB5_MAX_BLOCKSIZE];
- struct blkcipher_desc desc;
+ struct skcipher_request *req;
struct scatterlist frags[4];
int fragno;
int fraglen;
@@ -497,6 +555,7 @@ decryptor(struct scatterlist *sg, void *data)
{
struct decryptor_desc *desc = data;
int thislen = desc->fraglen + sg->length;
+ struct crypto_skcipher *tfm = crypto_skcipher_reqtfm(desc->req);
int fraglen, ret;
/* Worst case is 4 fragments: head, end of page 1, start
@@ -507,7 +566,7 @@ decryptor(struct scatterlist *sg, void *data)
desc->fragno++;
desc->fraglen += sg->length;
- fraglen = thislen & (crypto_blkcipher_blocksize(desc->desc.tfm) - 1);
+ fraglen = thislen & (crypto_skcipher_blocksize(tfm) - 1);
thislen -= fraglen;
if (thislen == 0)
@@ -515,8 +574,10 @@ decryptor(struct scatterlist *sg, void *data)
sg_mark_end(&desc->frags[desc->fragno - 1]);
- ret = crypto_blkcipher_decrypt_iv(&desc->desc, desc->frags,
- desc->frags, thislen);
+ skcipher_request_set_crypt(desc->req, desc->frags, desc->frags,
+ thislen, desc->iv);
+
+ ret = crypto_skcipher_decrypt(desc->req);
if (ret)
return ret;
@@ -535,24 +596,29 @@ decryptor(struct scatterlist *sg, void *data)
}
int
-gss_decrypt_xdr_buf(struct crypto_blkcipher *tfm, struct xdr_buf *buf,
+gss_decrypt_xdr_buf(struct crypto_skcipher *tfm, struct xdr_buf *buf,
int offset)
{
+ int ret;
struct decryptor_desc desc;
+ SKCIPHER_REQUEST_ON_STACK(req, tfm);
/* XXXJBF: */
- BUG_ON((buf->len - offset) % crypto_blkcipher_blocksize(tfm) != 0);
+ BUG_ON((buf->len - offset) % crypto_skcipher_blocksize(tfm) != 0);
+
+ skcipher_request_set_tfm(req, tfm);
+ skcipher_request_set_callback(req, 0, NULL, NULL);
memset(desc.iv, 0, sizeof(desc.iv));
- desc.desc.tfm = tfm;
- desc.desc.info = desc.iv;
- desc.desc.flags = 0;
+ desc.req = req;
desc.fragno = 0;
desc.fraglen = 0;
sg_init_table(desc.frags, 4);
- return xdr_process_buf(buf, offset, buf->len - offset, decryptor, &desc);
+ ret = xdr_process_buf(buf, offset, buf->len - offset, decryptor, &desc);
+ skcipher_request_zero(req);
+ return ret;
}
/*
@@ -594,12 +660,12 @@ xdr_extend_head(struct xdr_buf *buf, unsigned int base, unsigned int shiftlen)
}
static u32
-gss_krb5_cts_crypt(struct crypto_blkcipher *cipher, struct xdr_buf *buf,
+gss_krb5_cts_crypt(struct crypto_skcipher *cipher, struct xdr_buf *buf,
u32 offset, u8 *iv, struct page **pages, int encrypt)
{
u32 ret;
struct scatterlist sg[1];
- struct blkcipher_desc desc = { .tfm = cipher, .info = iv };
+ SKCIPHER_REQUEST_ON_STACK(req, cipher);
u8 data[GSS_KRB5_MAX_BLOCKSIZE * 2];
struct page **save_pages;
u32 len = buf->len - offset;
@@ -625,10 +691,16 @@ gss_krb5_cts_crypt(struct crypto_blkcipher *cipher, struct xdr_buf *buf,
sg_init_one(sg, data, len);
+ skcipher_request_set_tfm(req, cipher);
+ skcipher_request_set_callback(req, 0, NULL, NULL);
+ skcipher_request_set_crypt(req, sg, sg, len, iv);
+
if (encrypt)
- ret = crypto_blkcipher_encrypt_iv(&desc, sg, sg, len);
+ ret = crypto_skcipher_encrypt(req);
else
- ret = crypto_blkcipher_decrypt_iv(&desc, sg, sg, len);
+ ret = crypto_skcipher_decrypt(req);
+
+ skcipher_request_zero(req);
if (ret)
goto out;
@@ -647,7 +719,7 @@ gss_krb5_aes_encrypt(struct krb5_ctx *kctx, u32 offset,
struct xdr_netobj hmac;
u8 *cksumkey;
u8 *ecptr;
- struct crypto_blkcipher *cipher, *aux_cipher;
+ struct crypto_skcipher *cipher, *aux_cipher;
int blocksize;
struct page **save_pages;
int nblocks, nbytes;
@@ -666,7 +738,7 @@ gss_krb5_aes_encrypt(struct krb5_ctx *kctx, u32 offset,
cksumkey = kctx->acceptor_integ;
usage = KG_USAGE_ACCEPTOR_SEAL;
}
- blocksize = crypto_blkcipher_blocksize(cipher);
+ blocksize = crypto_skcipher_blocksize(cipher);
/* hide the gss token header and insert the confounder */
offset += GSS_KRB5_TOK_HDR_LEN;
@@ -719,20 +791,24 @@ gss_krb5_aes_encrypt(struct krb5_ctx *kctx, u32 offset,
memset(desc.iv, 0, sizeof(desc.iv));
if (cbcbytes) {
+ SKCIPHER_REQUEST_ON_STACK(req, aux_cipher);
+
desc.pos = offset + GSS_KRB5_TOK_HDR_LEN;
desc.fragno = 0;
desc.fraglen = 0;
desc.pages = pages;
desc.outbuf = buf;
- desc.desc.info = desc.iv;
- desc.desc.flags = 0;
- desc.desc.tfm = aux_cipher;
+ desc.req = req;
+
+ skcipher_request_set_tfm(req, aux_cipher);
+ skcipher_request_set_callback(req, 0, NULL, NULL);
sg_init_table(desc.infrags, 4);
sg_init_table(desc.outfrags, 4);
err = xdr_process_buf(buf, offset + GSS_KRB5_TOK_HDR_LEN,
cbcbytes, encryptor, &desc);
+ skcipher_request_zero(req);
if (err)
goto out_err;
}
@@ -763,7 +839,7 @@ gss_krb5_aes_decrypt(struct krb5_ctx *kctx, u32 offset, struct xdr_buf *buf,
struct xdr_buf subbuf;
u32 ret = 0;
u8 *cksum_key;
- struct crypto_blkcipher *cipher, *aux_cipher;
+ struct crypto_skcipher *cipher, *aux_cipher;
struct xdr_netobj our_hmac_obj;
u8 our_hmac[GSS_KRB5_MAX_CKSUM_LEN];
u8 pkt_hmac[GSS_KRB5_MAX_CKSUM_LEN];
@@ -782,7 +858,7 @@ gss_krb5_aes_decrypt(struct krb5_ctx *kctx, u32 offset, struct xdr_buf *buf,
cksum_key = kctx->initiator_integ;
usage = KG_USAGE_INITIATOR_SEAL;
}
- blocksize = crypto_blkcipher_blocksize(cipher);
+ blocksize = crypto_skcipher_blocksize(cipher);
/* create a segment skipping the header and leaving out the checksum */
@@ -799,15 +875,19 @@ gss_krb5_aes_decrypt(struct krb5_ctx *kctx, u32 offset, struct xdr_buf *buf,
memset(desc.iv, 0, sizeof(desc.iv));
if (cbcbytes) {
+ SKCIPHER_REQUEST_ON_STACK(req, aux_cipher);
+
desc.fragno = 0;
desc.fraglen = 0;
- desc.desc.info = desc.iv;
- desc.desc.flags = 0;
- desc.desc.tfm = aux_cipher;
+ desc.req = req;
+
+ skcipher_request_set_tfm(req, aux_cipher);
+ skcipher_request_set_callback(req, 0, NULL, NULL);
sg_init_table(desc.frags, 4);
ret = xdr_process_buf(&subbuf, 0, cbcbytes, decryptor, &desc);
+ skcipher_request_zero(req);
if (ret)
goto out_err;
}
@@ -850,61 +930,63 @@ out_err:
* Set the key of the given cipher.
*/
int
-krb5_rc4_setup_seq_key(struct krb5_ctx *kctx, struct crypto_blkcipher *cipher,
+krb5_rc4_setup_seq_key(struct krb5_ctx *kctx, struct crypto_skcipher *cipher,
unsigned char *cksum)
{
- struct crypto_hash *hmac;
- struct hash_desc desc;
- struct scatterlist sg[1];
+ struct crypto_shash *hmac;
+ struct shash_desc *desc;
u8 Kseq[GSS_KRB5_MAX_KEYLEN];
u32 zeroconstant = 0;
int err;
dprintk("%s: entered\n", __func__);
- hmac = crypto_alloc_hash(kctx->gk5e->cksum_name, 0, CRYPTO_ALG_ASYNC);
+ hmac = crypto_alloc_shash(kctx->gk5e->cksum_name, 0, 0);
if (IS_ERR(hmac)) {
dprintk("%s: error %ld, allocating hash '%s'\n",
__func__, PTR_ERR(hmac), kctx->gk5e->cksum_name);
return PTR_ERR(hmac);
}
- desc.tfm = hmac;
- desc.flags = 0;
+ desc = kmalloc(sizeof(*desc) + crypto_shash_descsize(hmac),
+ GFP_KERNEL);
+ if (!desc) {
+ dprintk("%s: failed to allocate shash descriptor for '%s'\n",
+ __func__, kctx->gk5e->cksum_name);
+ crypto_free_shash(hmac);
+ return -ENOMEM;
+ }
- err = crypto_hash_init(&desc);
- if (err)
- goto out_err;
+ desc->tfm = hmac;
+ desc->flags = 0;
/* Compute intermediate Kseq from session key */
- err = crypto_hash_setkey(hmac, kctx->Ksess, kctx->gk5e->keylength);
+ err = crypto_shash_setkey(hmac, kctx->Ksess, kctx->gk5e->keylength);
if (err)
goto out_err;
- sg_init_one(sg, &zeroconstant, 4);
- err = crypto_hash_digest(&desc, sg, 4, Kseq);
+ err = crypto_shash_digest(desc, (u8 *)&zeroconstant, 4, Kseq);
if (err)
goto out_err;
/* Compute final Kseq from the checksum and intermediate Kseq */
- err = crypto_hash_setkey(hmac, Kseq, kctx->gk5e->keylength);
+ err = crypto_shash_setkey(hmac, Kseq, kctx->gk5e->keylength);
if (err)
goto out_err;
- sg_set_buf(sg, cksum, 8);
-
- err = crypto_hash_digest(&desc, sg, 8, Kseq);
+ err = crypto_shash_digest(desc, cksum, 8, Kseq);
if (err)
goto out_err;
- err = crypto_blkcipher_setkey(cipher, Kseq, kctx->gk5e->keylength);
+ err = crypto_skcipher_setkey(cipher, Kseq, kctx->gk5e->keylength);
if (err)
goto out_err;
err = 0;
out_err:
- crypto_free_hash(hmac);
+ kzfree(desc);
+ crypto_free_shash(hmac);
dprintk("%s: returning %d\n", __func__, err);
return err;
}
@@ -914,12 +996,11 @@ out_err:
* Set the key of cipher kctx->enc.
*/
int
-krb5_rc4_setup_enc_key(struct krb5_ctx *kctx, struct crypto_blkcipher *cipher,
+krb5_rc4_setup_enc_key(struct krb5_ctx *kctx, struct crypto_skcipher *cipher,
s32 seqnum)
{
- struct crypto_hash *hmac;
- struct hash_desc desc;
- struct scatterlist sg[1];
+ struct crypto_shash *hmac;
+ struct shash_desc *desc;
u8 Kcrypt[GSS_KRB5_MAX_KEYLEN];
u8 zeroconstant[4] = {0};
u8 seqnumarray[4];
@@ -927,35 +1008,39 @@ krb5_rc4_setup_enc_key(struct krb5_ctx *kctx, struct crypto_blkcipher *cipher,
dprintk("%s: entered, seqnum %u\n", __func__, seqnum);
- hmac = crypto_alloc_hash(kctx->gk5e->cksum_name, 0, CRYPTO_ALG_ASYNC);
+ hmac = crypto_alloc_shash(kctx->gk5e->cksum_name, 0, 0);
if (IS_ERR(hmac)) {
dprintk("%s: error %ld, allocating hash '%s'\n",
__func__, PTR_ERR(hmac), kctx->gk5e->cksum_name);
return PTR_ERR(hmac);
}
- desc.tfm = hmac;
- desc.flags = 0;
+ desc = kmalloc(sizeof(*desc) + crypto_shash_descsize(hmac),
+ GFP_KERNEL);
+ if (!desc) {
+ dprintk("%s: failed to allocate shash descriptor for '%s'\n",
+ __func__, kctx->gk5e->cksum_name);
+ crypto_free_shash(hmac);
+ return -ENOMEM;
+ }
- err = crypto_hash_init(&desc);
- if (err)
- goto out_err;
+ desc->tfm = hmac;
+ desc->flags = 0;
/* Compute intermediate Kcrypt from session key */
for (i = 0; i < kctx->gk5e->keylength; i++)
Kcrypt[i] = kctx->Ksess[i] ^ 0xf0;
- err = crypto_hash_setkey(hmac, Kcrypt, kctx->gk5e->keylength);
+ err = crypto_shash_setkey(hmac, Kcrypt, kctx->gk5e->keylength);
if (err)
goto out_err;
- sg_init_one(sg, zeroconstant, 4);
- err = crypto_hash_digest(&desc, sg, 4, Kcrypt);
+ err = crypto_shash_digest(desc, zeroconstant, 4, Kcrypt);
if (err)
goto out_err;
/* Compute final Kcrypt from the seqnum and intermediate Kcrypt */
- err = crypto_hash_setkey(hmac, Kcrypt, kctx->gk5e->keylength);
+ err = crypto_shash_setkey(hmac, Kcrypt, kctx->gk5e->keylength);
if (err)
goto out_err;
@@ -964,20 +1049,19 @@ krb5_rc4_setup_enc_key(struct krb5_ctx *kctx, struct crypto_blkcipher *cipher,
seqnumarray[2] = (unsigned char) ((seqnum >> 8) & 0xff);
seqnumarray[3] = (unsigned char) ((seqnum >> 0) & 0xff);
- sg_set_buf(sg, seqnumarray, 4);
-
- err = crypto_hash_digest(&desc, sg, 4, Kcrypt);
+ err = crypto_shash_digest(desc, seqnumarray, 4, Kcrypt);
if (err)
goto out_err;
- err = crypto_blkcipher_setkey(cipher, Kcrypt, kctx->gk5e->keylength);
+ err = crypto_skcipher_setkey(cipher, Kcrypt, kctx->gk5e->keylength);
if (err)
goto out_err;
err = 0;
out_err:
- crypto_free_hash(hmac);
+ kzfree(desc);
+ crypto_free_shash(hmac);
dprintk("%s: returning %d\n", __func__, err);
return err;
}
diff --git a/net/sunrpc/auth_gss/gss_krb5_keys.c b/net/sunrpc/auth_gss/gss_krb5_keys.c
index 234fa8d0fd9b..870133146026 100644
--- a/net/sunrpc/auth_gss/gss_krb5_keys.c
+++ b/net/sunrpc/auth_gss/gss_krb5_keys.c
@@ -54,9 +54,9 @@
* WARRANTIES OF MERCHANTIBILITY AND FITNESS FOR A PARTICULAR PURPOSE.
*/
+#include <crypto/skcipher.h>
#include <linux/err.h>
#include <linux/types.h>
-#include <linux/crypto.h>
#include <linux/sunrpc/gss_krb5.h>
#include <linux/sunrpc/xdr.h>
#include <linux/lcm.h>
@@ -147,7 +147,7 @@ u32 krb5_derive_key(const struct gss_krb5_enctype *gk5e,
size_t blocksize, keybytes, keylength, n;
unsigned char *inblockdata, *outblockdata, *rawkey;
struct xdr_netobj inblock, outblock;
- struct crypto_blkcipher *cipher;
+ struct crypto_skcipher *cipher;
u32 ret = EINVAL;
blocksize = gk5e->blocksize;
@@ -157,11 +157,11 @@ u32 krb5_derive_key(const struct gss_krb5_enctype *gk5e,
if ((inkey->len != keylength) || (outkey->len != keylength))
goto err_return;
- cipher = crypto_alloc_blkcipher(gk5e->encrypt_name, 0,
- CRYPTO_ALG_ASYNC);
+ cipher = crypto_alloc_skcipher(gk5e->encrypt_name, 0,
+ CRYPTO_ALG_ASYNC);
if (IS_ERR(cipher))
goto err_return;
- if (crypto_blkcipher_setkey(cipher, inkey->data, inkey->len))
+ if (crypto_skcipher_setkey(cipher, inkey->data, inkey->len))
goto err_return;
/* allocate and set up buffers */
@@ -238,7 +238,7 @@ err_free_in:
memset(inblockdata, 0, blocksize);
kfree(inblockdata);
err_free_cipher:
- crypto_free_blkcipher(cipher);
+ crypto_free_skcipher(cipher);
err_return:
return ret;
}
diff --git a/net/sunrpc/auth_gss/gss_krb5_mech.c b/net/sunrpc/auth_gss/gss_krb5_mech.c
index 28db442a0034..65427492b1c9 100644
--- a/net/sunrpc/auth_gss/gss_krb5_mech.c
+++ b/net/sunrpc/auth_gss/gss_krb5_mech.c
@@ -34,6 +34,8 @@
*
*/
+#include <crypto/hash.h>
+#include <crypto/skcipher.h>
#include <linux/err.h>
#include <linux/module.h>
#include <linux/init.h>
@@ -42,7 +44,6 @@
#include <linux/sunrpc/auth.h>
#include <linux/sunrpc/gss_krb5.h>
#include <linux/sunrpc/xdr.h>
-#include <linux/crypto.h>
#include <linux/sunrpc/gss_krb5_enctypes.h>
#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
@@ -217,7 +218,7 @@ simple_get_netobj(const void *p, const void *end, struct xdr_netobj *res)
static inline const void *
get_key(const void *p, const void *end,
- struct krb5_ctx *ctx, struct crypto_blkcipher **res)
+ struct krb5_ctx *ctx, struct crypto_skcipher **res)
{
struct xdr_netobj key;
int alg;
@@ -245,7 +246,7 @@ get_key(const void *p, const void *end,
if (IS_ERR(p))
goto out_err;
- *res = crypto_alloc_blkcipher(ctx->gk5e->encrypt_name, 0,
+ *res = crypto_alloc_skcipher(ctx->gk5e->encrypt_name, 0,
CRYPTO_ALG_ASYNC);
if (IS_ERR(*res)) {
printk(KERN_WARNING "gss_kerberos_mech: unable to initialize "
@@ -253,7 +254,7 @@ get_key(const void *p, const void *end,
*res = NULL;
goto out_err_free_key;
}
- if (crypto_blkcipher_setkey(*res, key.data, key.len)) {
+ if (crypto_skcipher_setkey(*res, key.data, key.len)) {
printk(KERN_WARNING "gss_kerberos_mech: error setting key for "
"crypto algorithm %s\n", ctx->gk5e->encrypt_name);
goto out_err_free_tfm;
@@ -263,7 +264,7 @@ get_key(const void *p, const void *end,
return p;
out_err_free_tfm:
- crypto_free_blkcipher(*res);
+ crypto_free_skcipher(*res);
out_err_free_key:
kfree(key.data);
p = ERR_PTR(-EINVAL);
@@ -335,30 +336,30 @@ gss_import_v1_context(const void *p, const void *end, struct krb5_ctx *ctx)
return 0;
out_err_free_key2:
- crypto_free_blkcipher(ctx->seq);
+ crypto_free_skcipher(ctx->seq);
out_err_free_key1:
- crypto_free_blkcipher(ctx->enc);
+ crypto_free_skcipher(ctx->enc);
out_err_free_mech:
kfree(ctx->mech_used.data);
out_err:
return PTR_ERR(p);
}
-static struct crypto_blkcipher *
+static struct crypto_skcipher *
context_v2_alloc_cipher(struct krb5_ctx *ctx, const char *cname, u8 *key)
{
- struct crypto_blkcipher *cp;
+ struct crypto_skcipher *cp;
- cp = crypto_alloc_blkcipher(cname, 0, CRYPTO_ALG_ASYNC);
+ cp = crypto_alloc_skcipher(cname, 0, CRYPTO_ALG_ASYNC);
if (IS_ERR(cp)) {
dprintk("gss_kerberos_mech: unable to initialize "
"crypto algorithm %s\n", cname);
return NULL;
}
- if (crypto_blkcipher_setkey(cp, key, ctx->gk5e->keylength)) {
+ if (crypto_skcipher_setkey(cp, key, ctx->gk5e->keylength)) {
dprintk("gss_kerberos_mech: error setting key for "
"crypto algorithm %s\n", cname);
- crypto_free_blkcipher(cp);
+ crypto_free_skcipher(cp);
return NULL;
}
return cp;
@@ -412,9 +413,9 @@ context_derive_keys_des3(struct krb5_ctx *ctx, gfp_t gfp_mask)
return 0;
out_free_enc:
- crypto_free_blkcipher(ctx->enc);
+ crypto_free_skcipher(ctx->enc);
out_free_seq:
- crypto_free_blkcipher(ctx->seq);
+ crypto_free_skcipher(ctx->seq);
out_err:
return -EINVAL;
}
@@ -427,18 +428,17 @@ out_err:
static int
context_derive_keys_rc4(struct krb5_ctx *ctx)
{
- struct crypto_hash *hmac;
+ struct crypto_shash *hmac;
char sigkeyconstant[] = "signaturekey";
int slen = strlen(sigkeyconstant) + 1; /* include null terminator */
- struct hash_desc desc;
- struct scatterlist sg[1];
+ struct shash_desc *desc;
int err;
dprintk("RPC: %s: entered\n", __func__);
/*
* derive cksum (aka Ksign) key
*/
- hmac = crypto_alloc_hash(ctx->gk5e->cksum_name, 0, CRYPTO_ALG_ASYNC);
+ hmac = crypto_alloc_shash(ctx->gk5e->cksum_name, 0, 0);
if (IS_ERR(hmac)) {
dprintk("%s: error %ld allocating hash '%s'\n",
__func__, PTR_ERR(hmac), ctx->gk5e->cksum_name);
@@ -446,37 +446,41 @@ context_derive_keys_rc4(struct krb5_ctx *ctx)
goto out_err;
}
- err = crypto_hash_setkey(hmac, ctx->Ksess, ctx->gk5e->keylength);
+ err = crypto_shash_setkey(hmac, ctx->Ksess, ctx->gk5e->keylength);
if (err)
goto out_err_free_hmac;
- sg_init_table(sg, 1);
- sg_set_buf(sg, sigkeyconstant, slen);
- desc.tfm = hmac;
- desc.flags = 0;
-
- err = crypto_hash_init(&desc);
- if (err)
+ desc = kmalloc(sizeof(*desc) + crypto_shash_descsize(hmac),
+ GFP_KERNEL);
+ if (!desc) {
+ dprintk("%s: failed to allocate hash descriptor for '%s'\n",
+ __func__, ctx->gk5e->cksum_name);
+ err = -ENOMEM;
goto out_err_free_hmac;
+ }
+
+ desc->tfm = hmac;
+ desc->flags = 0;
- err = crypto_hash_digest(&desc, sg, slen, ctx->cksum);
+ err = crypto_shash_digest(desc, sigkeyconstant, slen, ctx->cksum);
+ kzfree(desc);
if (err)
goto out_err_free_hmac;
/*
- * allocate hash, and blkciphers for data and seqnum encryption
+ * allocate hash, and skciphers for data and seqnum encryption
*/
- ctx->enc = crypto_alloc_blkcipher(ctx->gk5e->encrypt_name, 0,
- CRYPTO_ALG_ASYNC);
+ ctx->enc = crypto_alloc_skcipher(ctx->gk5e->encrypt_name, 0,
+ CRYPTO_ALG_ASYNC);
if (IS_ERR(ctx->enc)) {
err = PTR_ERR(ctx->enc);
goto out_err_free_hmac;
}
- ctx->seq = crypto_alloc_blkcipher(ctx->gk5e->encrypt_name, 0,
- CRYPTO_ALG_ASYNC);
+ ctx->seq = crypto_alloc_skcipher(ctx->gk5e->encrypt_name, 0,
+ CRYPTO_ALG_ASYNC);
if (IS_ERR(ctx->seq)) {
- crypto_free_blkcipher(ctx->enc);
+ crypto_free_skcipher(ctx->enc);
err = PTR_ERR(ctx->seq);
goto out_err_free_hmac;
}
@@ -486,7 +490,7 @@ context_derive_keys_rc4(struct krb5_ctx *ctx)
err = 0;
out_err_free_hmac:
- crypto_free_hash(hmac);
+ crypto_free_shash(hmac);
out_err:
dprintk("RPC: %s: returning %d\n", __func__, err);
return err;
@@ -588,7 +592,7 @@ context_derive_keys_new(struct krb5_ctx *ctx, gfp_t gfp_mask)
context_v2_alloc_cipher(ctx, "cbc(aes)",
ctx->acceptor_seal);
if (ctx->acceptor_enc_aux == NULL) {
- crypto_free_blkcipher(ctx->initiator_enc_aux);
+ crypto_free_skcipher(ctx->initiator_enc_aux);
goto out_free_acceptor_enc;
}
}
@@ -596,9 +600,9 @@ context_derive_keys_new(struct krb5_ctx *ctx, gfp_t gfp_mask)
return 0;
out_free_acceptor_enc:
- crypto_free_blkcipher(ctx->acceptor_enc);
+ crypto_free_skcipher(ctx->acceptor_enc);
out_free_initiator_enc:
- crypto_free_blkcipher(ctx->initiator_enc);
+ crypto_free_skcipher(ctx->initiator_enc);
out_err:
return -EINVAL;
}
@@ -710,12 +714,12 @@ static void
gss_delete_sec_context_kerberos(void *internal_ctx) {
struct krb5_ctx *kctx = internal_ctx;
- crypto_free_blkcipher(kctx->seq);
- crypto_free_blkcipher(kctx->enc);
- crypto_free_blkcipher(kctx->acceptor_enc);
- crypto_free_blkcipher(kctx->initiator_enc);
- crypto_free_blkcipher(kctx->acceptor_enc_aux);
- crypto_free_blkcipher(kctx->initiator_enc_aux);
+ crypto_free_skcipher(kctx->seq);
+ crypto_free_skcipher(kctx->enc);
+ crypto_free_skcipher(kctx->acceptor_enc);
+ crypto_free_skcipher(kctx->initiator_enc);
+ crypto_free_skcipher(kctx->acceptor_enc_aux);
+ crypto_free_skcipher(kctx->initiator_enc_aux);
kfree(kctx->mech_used.data);
kfree(kctx);
}
diff --git a/net/sunrpc/auth_gss/gss_krb5_seqnum.c b/net/sunrpc/auth_gss/gss_krb5_seqnum.c
index 20d55c793eb6..c8b9082f4a9d 100644
--- a/net/sunrpc/auth_gss/gss_krb5_seqnum.c
+++ b/net/sunrpc/auth_gss/gss_krb5_seqnum.c
@@ -31,9 +31,9 @@
* PERFORMANCE OF THIS SOFTWARE.
*/
+#include <crypto/skcipher.h>
#include <linux/types.h>
#include <linux/sunrpc/gss_krb5.h>
-#include <linux/crypto.h>
#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
# define RPCDBG_FACILITY RPCDBG_AUTH
@@ -43,13 +43,13 @@ static s32
krb5_make_rc4_seq_num(struct krb5_ctx *kctx, int direction, s32 seqnum,
unsigned char *cksum, unsigned char *buf)
{
- struct crypto_blkcipher *cipher;
+ struct crypto_skcipher *cipher;
unsigned char plain[8];
s32 code;
dprintk("RPC: %s:\n", __func__);
- cipher = crypto_alloc_blkcipher(kctx->gk5e->encrypt_name, 0,
- CRYPTO_ALG_ASYNC);
+ cipher = crypto_alloc_skcipher(kctx->gk5e->encrypt_name, 0,
+ CRYPTO_ALG_ASYNC);
if (IS_ERR(cipher))
return PTR_ERR(cipher);
@@ -68,12 +68,12 @@ krb5_make_rc4_seq_num(struct krb5_ctx *kctx, int direction, s32 seqnum,
code = krb5_encrypt(cipher, cksum, plain, buf, 8);
out:
- crypto_free_blkcipher(cipher);
+ crypto_free_skcipher(cipher);
return code;
}
s32
krb5_make_seq_num(struct krb5_ctx *kctx,
- struct crypto_blkcipher *key,
+ struct crypto_skcipher *key,
int direction,
u32 seqnum,
unsigned char *cksum, unsigned char *buf)
@@ -101,13 +101,13 @@ static s32
krb5_get_rc4_seq_num(struct krb5_ctx *kctx, unsigned char *cksum,
unsigned char *buf, int *direction, s32 *seqnum)
{
- struct crypto_blkcipher *cipher;
+ struct crypto_skcipher *cipher;
unsigned char plain[8];
s32 code;
dprintk("RPC: %s:\n", __func__);
- cipher = crypto_alloc_blkcipher(kctx->gk5e->encrypt_name, 0,
- CRYPTO_ALG_ASYNC);
+ cipher = crypto_alloc_skcipher(kctx->gk5e->encrypt_name, 0,
+ CRYPTO_ALG_ASYNC);
if (IS_ERR(cipher))
return PTR_ERR(cipher);
@@ -130,7 +130,7 @@ krb5_get_rc4_seq_num(struct krb5_ctx *kctx, unsigned char *cksum,
*seqnum = ((plain[0] << 24) | (plain[1] << 16) |
(plain[2] << 8) | (plain[3]));
out:
- crypto_free_blkcipher(cipher);
+ crypto_free_skcipher(cipher);
return code;
}
@@ -142,7 +142,7 @@ krb5_get_seq_num(struct krb5_ctx *kctx,
{
s32 code;
unsigned char plain[8];
- struct crypto_blkcipher *key = kctx->seq;
+ struct crypto_skcipher *key = kctx->seq;
dprintk("RPC: krb5_get_seq_num:\n");
diff --git a/net/sunrpc/auth_gss/gss_krb5_wrap.c b/net/sunrpc/auth_gss/gss_krb5_wrap.c
index ca7e92a32f84..a737c2da0837 100644
--- a/net/sunrpc/auth_gss/gss_krb5_wrap.c
+++ b/net/sunrpc/auth_gss/gss_krb5_wrap.c
@@ -28,12 +28,12 @@
* SUCH DAMAGES.
*/
+#include <crypto/skcipher.h>
#include <linux/types.h>
#include <linux/jiffies.h>
#include <linux/sunrpc/gss_krb5.h>
#include <linux/random.h>
#include <linux/pagemap.h>
-#include <linux/crypto.h>
#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
# define RPCDBG_FACILITY RPCDBG_AUTH
@@ -79,9 +79,9 @@ gss_krb5_remove_padding(struct xdr_buf *buf, int blocksize)
len -= buf->head[0].iov_len;
if (len <= buf->page_len) {
unsigned int last = (buf->page_base + len - 1)
- >>PAGE_CACHE_SHIFT;
+ >>PAGE_SHIFT;
unsigned int offset = (buf->page_base + len - 1)
- & (PAGE_CACHE_SIZE - 1);
+ & (PAGE_SIZE - 1);
ptr = kmap_atomic(buf->pages[last]);
pad = *(ptr + offset);
kunmap_atomic(ptr);
@@ -174,7 +174,7 @@ gss_wrap_kerberos_v1(struct krb5_ctx *kctx, int offset,
now = get_seconds();
- blocksize = crypto_blkcipher_blocksize(kctx->enc);
+ blocksize = crypto_skcipher_blocksize(kctx->enc);
gss_krb5_add_padding(buf, offset, blocksize);
BUG_ON((buf->len - offset) % blocksize);
plainlen = conflen + buf->len - offset;
@@ -239,10 +239,10 @@ gss_wrap_kerberos_v1(struct krb5_ctx *kctx, int offset,
return GSS_S_FAILURE;
if (kctx->enctype == ENCTYPE_ARCFOUR_HMAC) {
- struct crypto_blkcipher *cipher;
+ struct crypto_skcipher *cipher;
int err;
- cipher = crypto_alloc_blkcipher(kctx->gk5e->encrypt_name, 0,
- CRYPTO_ALG_ASYNC);
+ cipher = crypto_alloc_skcipher(kctx->gk5e->encrypt_name, 0,
+ CRYPTO_ALG_ASYNC);
if (IS_ERR(cipher))
return GSS_S_FAILURE;
@@ -250,7 +250,7 @@ gss_wrap_kerberos_v1(struct krb5_ctx *kctx, int offset,
err = gss_encrypt_xdr_buf(cipher, buf,
offset + headlen - conflen, pages);
- crypto_free_blkcipher(cipher);
+ crypto_free_skcipher(cipher);
if (err)
return GSS_S_FAILURE;
} else {
@@ -327,18 +327,18 @@ gss_unwrap_kerberos_v1(struct krb5_ctx *kctx, int offset, struct xdr_buf *buf)
return GSS_S_BAD_SIG;
if (kctx->enctype == ENCTYPE_ARCFOUR_HMAC) {
- struct crypto_blkcipher *cipher;
+ struct crypto_skcipher *cipher;
int err;
- cipher = crypto_alloc_blkcipher(kctx->gk5e->encrypt_name, 0,
- CRYPTO_ALG_ASYNC);
+ cipher = crypto_alloc_skcipher(kctx->gk5e->encrypt_name, 0,
+ CRYPTO_ALG_ASYNC);
if (IS_ERR(cipher))
return GSS_S_FAILURE;
krb5_rc4_setup_enc_key(kctx, cipher, seqnum);
err = gss_decrypt_xdr_buf(cipher, buf, crypt_offset);
- crypto_free_blkcipher(cipher);
+ crypto_free_skcipher(cipher);
if (err)
return GSS_S_DEFECTIVE_TOKEN;
} else {
@@ -371,7 +371,7 @@ gss_unwrap_kerberos_v1(struct krb5_ctx *kctx, int offset, struct xdr_buf *buf)
/* Copy the data back to the right position. XXX: Would probably be
* better to copy and encrypt at the same time. */
- blocksize = crypto_blkcipher_blocksize(kctx->enc);
+ blocksize = crypto_skcipher_blocksize(kctx->enc);
data_start = ptr + (GSS_KRB5_TOK_HDR_LEN + kctx->gk5e->cksumlength) +
conflen;
orig_start = buf->head[0].iov_base + offset;
@@ -473,7 +473,7 @@ gss_wrap_kerberos_v2(struct krb5_ctx *kctx, u32 offset,
*ptr++ = 0xff;
be16ptr = (__be16 *)ptr;
- blocksize = crypto_blkcipher_blocksize(kctx->acceptor_enc);
+ blocksize = crypto_skcipher_blocksize(kctx->acceptor_enc);
*be16ptr++ = 0;
/* "inner" token header always uses 0 for RRC */
*be16ptr++ = 0;
diff --git a/net/sunrpc/auth_gss/svcauth_gss.c b/net/sunrpc/auth_gss/svcauth_gss.c
index 1095be9c80ab..e085f5ae1548 100644
--- a/net/sunrpc/auth_gss/svcauth_gss.c
+++ b/net/sunrpc/auth_gss/svcauth_gss.c
@@ -569,10 +569,9 @@ gss_svc_searchbyctx(struct cache_detail *cd, struct xdr_netobj *handle)
struct rsc *found;
memset(&rsci, 0, sizeof(rsci));
- if (dup_to_netobj(&rsci.handle, handle->data, handle->len))
- return NULL;
+ rsci.handle.data = handle->data;
+ rsci.handle.len = handle->len;
found = rsc_lookup(cd, &rsci);
- rsc_free(&rsci);
if (!found)
return NULL;
if (cache_check(cd, &found->h, NULL))
@@ -857,8 +856,8 @@ unwrap_integ_data(struct svc_rqst *rqstp, struct xdr_buf *buf, u32 seq, struct g
goto out;
if (svc_getnl(&buf->head[0]) != seq)
goto out;
- /* trim off the mic at the end before returning */
- xdr_buf_trim(buf, mic.len + 4);
+ /* trim off the mic and padding at the end before returning */
+ xdr_buf_trim(buf, round_up_to_quad(mic.len) + 4);
stat = 0;
out:
kfree(mic.data);
diff --git a/net/sunrpc/auth_null.c b/net/sunrpc/auth_null.c
index c2a2b584a056..8d9eb4d5ddd8 100644
--- a/net/sunrpc/auth_null.c
+++ b/net/sunrpc/auth_null.c
@@ -113,8 +113,8 @@ const struct rpc_authops authnull_ops = {
static
struct rpc_auth null_auth = {
- .au_cslack = 4,
- .au_rslack = 2,
+ .au_cslack = NUL_CALLSLACK,
+ .au_rslack = NUL_REPLYSLACK,
.au_ops = &authnull_ops,
.au_flavor = RPC_AUTH_NULL,
.au_count = ATOMIC_INIT(0),
diff --git a/net/sunrpc/auth_unix.c b/net/sunrpc/auth_unix.c
index 548240dd15fc..9f65452b7cbc 100644
--- a/net/sunrpc/auth_unix.c
+++ b/net/sunrpc/auth_unix.c
@@ -23,8 +23,6 @@ struct unx_cred {
};
#define uc_uid uc_base.cr_uid
-#define UNX_WRITESLACK (21 + XDR_QUADLEN(UNX_MAXNODENAME))
-
#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
# define RPCDBG_FACILITY RPCDBG_AUTH
#endif
@@ -54,11 +52,11 @@ unx_destroy(struct rpc_auth *auth)
static struct rpc_cred *
unx_lookup_cred(struct rpc_auth *auth, struct auth_cred *acred, int flags)
{
- return rpcauth_lookup_credcache(auth, acred, flags);
+ return rpcauth_lookup_credcache(auth, acred, flags, GFP_NOFS);
}
static struct rpc_cred *
-unx_create_cred(struct rpc_auth *auth, struct auth_cred *acred, int flags)
+unx_create_cred(struct rpc_auth *auth, struct auth_cred *acred, int flags, gfp_t gfp)
{
struct unx_cred *cred;
unsigned int groups = 0;
@@ -68,7 +66,7 @@ unx_create_cred(struct rpc_auth *auth, struct auth_cred *acred, int flags)
from_kuid(&init_user_ns, acred->uid),
from_kgid(&init_user_ns, acred->gid));
- if (!(cred = kmalloc(sizeof(*cred), GFP_NOFS)))
+ if (!(cred = kmalloc(sizeof(*cred), gfp)))
return ERR_PTR(-ENOMEM);
rpcauth_init_cred(&cred->uc_base, acred, auth, &unix_credops);
@@ -228,8 +226,8 @@ const struct rpc_authops authunix_ops = {
static
struct rpc_auth unix_auth = {
- .au_cslack = UNX_WRITESLACK,
- .au_rslack = 2, /* assume AUTH_NULL verf */
+ .au_cslack = UNX_CALLSLACK,
+ .au_rslack = NUL_REPLYSLACK,
.au_ops = &authunix_ops,
.au_flavor = RPC_AUTH_UNIX,
.au_count = ATOMIC_INIT(0),
diff --git a/net/sunrpc/cache.c b/net/sunrpc/cache.c
index 273bc3a35425..553bf95f7003 100644
--- a/net/sunrpc/cache.c
+++ b/net/sunrpc/cache.c
@@ -881,7 +881,7 @@ static ssize_t cache_downcall(struct address_space *mapping,
char *kaddr;
ssize_t ret = -ENOMEM;
- if (count >= PAGE_CACHE_SIZE)
+ if (count >= PAGE_SIZE)
goto out_slow;
page = find_or_create_page(mapping, 0, GFP_KERNEL);
@@ -892,7 +892,7 @@ static ssize_t cache_downcall(struct address_space *mapping,
ret = cache_do_downcall(kaddr, buf, count, cd);
kunmap(page);
unlock_page(page);
- page_cache_release(page);
+ put_page(page);
return ret;
out_slow:
return cache_slow_downcall(buf, count, cd);
@@ -1182,14 +1182,14 @@ int sunrpc_cache_pipe_upcall(struct cache_detail *detail, struct cache_head *h)
}
crq->q.reader = 0;
- crq->item = cache_get(h);
crq->buf = buf;
crq->len = 0;
crq->readers = 0;
spin_lock(&queue_lock);
- if (test_bit(CACHE_PENDING, &h->flags))
+ if (test_bit(CACHE_PENDING, &h->flags)) {
+ crq->item = cache_get(h);
list_add_tail(&crq->q.list, &detail->queue);
- else
+ } else
/* Lost a race, no longer PENDING, so don't enqueue */
ret = -EAGAIN;
spin_unlock(&queue_lock);
diff --git a/net/sunrpc/clnt.c b/net/sunrpc/clnt.c
index b7f21044f4d8..2808d550d273 100644
--- a/net/sunrpc/clnt.c
+++ b/net/sunrpc/clnt.c
@@ -354,6 +354,7 @@ static void rpc_free_clid(struct rpc_clnt *clnt)
}
static struct rpc_clnt * rpc_new_client(const struct rpc_create_args *args,
+ struct rpc_xprt_switch *xps,
struct rpc_xprt *xprt,
struct rpc_clnt *parent)
{
@@ -411,6 +412,8 @@ static struct rpc_clnt * rpc_new_client(const struct rpc_create_args *args,
}
rpc_clnt_set_transport(clnt, xprt, timeout);
+ xprt_iter_init(&clnt->cl_xpi, xps);
+ xprt_switch_put(xps);
clnt->cl_rtt = &clnt->cl_rtt_default;
rpc_init_rtt(&clnt->cl_rtt_default, clnt->cl_timeout->to_initval);
@@ -438,16 +441,33 @@ out_no_clid:
out_err:
rpciod_down();
out_no_rpciod:
+ xprt_switch_put(xps);
xprt_put(xprt);
return ERR_PTR(err);
}
-struct rpc_clnt *rpc_create_xprt(struct rpc_create_args *args,
+static struct rpc_clnt *rpc_create_xprt(struct rpc_create_args *args,
struct rpc_xprt *xprt)
{
struct rpc_clnt *clnt = NULL;
-
- clnt = rpc_new_client(args, xprt, NULL);
+ struct rpc_xprt_switch *xps;
+
+ if (args->bc_xprt && args->bc_xprt->xpt_bc_xps) {
+ WARN_ON(args->protocol != XPRT_TRANSPORT_BC_TCP);
+ xps = args->bc_xprt->xpt_bc_xps;
+ xprt_switch_get(xps);
+ } else {
+ xps = xprt_switch_alloc(xprt, GFP_KERNEL);
+ if (xps == NULL) {
+ xprt_put(xprt);
+ return ERR_PTR(-ENOMEM);
+ }
+ if (xprt->bc_xprt) {
+ xprt_switch_get(xps);
+ xprt->bc_xprt->xpt_bc_xps = xps;
+ }
+ }
+ clnt = rpc_new_client(args, xps, xprt, NULL);
if (IS_ERR(clnt))
return clnt;
@@ -474,7 +494,6 @@ struct rpc_clnt *rpc_create_xprt(struct rpc_create_args *args,
return clnt;
}
-EXPORT_SYMBOL_GPL(rpc_create_xprt);
/**
* rpc_create - create an RPC client and transport with one call
@@ -500,6 +519,15 @@ struct rpc_clnt *rpc_create(struct rpc_create_args *args)
};
char servername[48];
+ if (args->bc_xprt) {
+ WARN_ON(args->protocol != XPRT_TRANSPORT_BC_TCP);
+ xprt = args->bc_xprt->xpt_bc_xprt;
+ if (xprt) {
+ xprt_get(xprt);
+ return rpc_create_xprt(args, xprt);
+ }
+ }
+
if (args->flags & RPC_CLNT_CREATE_INFINITE_SLOTS)
xprtargs.flags |= XPRT_CREATE_INFINITE_SLOTS;
if (args->flags & RPC_CLNT_CREATE_NO_IDLE_TIMEOUT)
@@ -564,6 +592,7 @@ EXPORT_SYMBOL_GPL(rpc_create);
static struct rpc_clnt *__rpc_clone_client(struct rpc_create_args *args,
struct rpc_clnt *clnt)
{
+ struct rpc_xprt_switch *xps;
struct rpc_xprt *xprt;
struct rpc_clnt *new;
int err;
@@ -571,13 +600,17 @@ static struct rpc_clnt *__rpc_clone_client(struct rpc_create_args *args,
err = -ENOMEM;
rcu_read_lock();
xprt = xprt_get(rcu_dereference(clnt->cl_xprt));
+ xps = xprt_switch_get(rcu_dereference(clnt->cl_xpi.xpi_xpswitch));
rcu_read_unlock();
- if (xprt == NULL)
+ if (xprt == NULL || xps == NULL) {
+ xprt_put(xprt);
+ xprt_switch_put(xps);
goto out_err;
+ }
args->servername = xprt->servername;
args->nodename = clnt->cl_nodename;
- new = rpc_new_client(args, xprt, clnt);
+ new = rpc_new_client(args, xps, xprt, clnt);
if (IS_ERR(new)) {
err = PTR_ERR(new);
goto out_err;
@@ -657,6 +690,7 @@ int rpc_switch_client_transport(struct rpc_clnt *clnt,
{
const struct rpc_timeout *old_timeo;
rpc_authflavor_t pseudoflavor;
+ struct rpc_xprt_switch *xps, *oldxps;
struct rpc_xprt *xprt, *old;
struct rpc_clnt *parent;
int err;
@@ -668,10 +702,17 @@ int rpc_switch_client_transport(struct rpc_clnt *clnt,
return PTR_ERR(xprt);
}
+ xps = xprt_switch_alloc(xprt, GFP_KERNEL);
+ if (xps == NULL) {
+ xprt_put(xprt);
+ return -ENOMEM;
+ }
+
pseudoflavor = clnt->cl_auth->au_flavor;
old_timeo = clnt->cl_timeout;
old = rpc_clnt_set_transport(clnt, xprt, timeout);
+ oldxps = xprt_iter_xchg_switch(&clnt->cl_xpi, xps);
rpc_unregister_client(clnt);
__rpc_clnt_remove_pipedir(clnt);
@@ -697,20 +738,74 @@ int rpc_switch_client_transport(struct rpc_clnt *clnt,
synchronize_rcu();
if (parent != clnt)
rpc_release_client(parent);
+ xprt_switch_put(oldxps);
xprt_put(old);
dprintk("RPC: replaced xprt for clnt %p\n", clnt);
return 0;
out_revert:
+ xps = xprt_iter_xchg_switch(&clnt->cl_xpi, oldxps);
rpc_clnt_set_transport(clnt, old, old_timeo);
clnt->cl_parent = parent;
rpc_client_register(clnt, pseudoflavor, NULL);
+ xprt_switch_put(xps);
xprt_put(xprt);
dprintk("RPC: failed to switch xprt for clnt %p\n", clnt);
return err;
}
EXPORT_SYMBOL_GPL(rpc_switch_client_transport);
+static
+int rpc_clnt_xprt_iter_init(struct rpc_clnt *clnt, struct rpc_xprt_iter *xpi)
+{
+ struct rpc_xprt_switch *xps;
+
+ rcu_read_lock();
+ xps = xprt_switch_get(rcu_dereference(clnt->cl_xpi.xpi_xpswitch));
+ rcu_read_unlock();
+ if (xps == NULL)
+ return -EAGAIN;
+ xprt_iter_init_listall(xpi, xps);
+ xprt_switch_put(xps);
+ return 0;
+}
+
+/**
+ * rpc_clnt_iterate_for_each_xprt - Apply a function to all transports
+ * @clnt: pointer to client
+ * @fn: function to apply
+ * @data: void pointer to function data
+ *
+ * Iterates through the list of RPC transports currently attached to the
+ * client and applies the function fn(clnt, xprt, data).
+ *
+ * On error, the iteration stops, and the function returns the error value.
+ */
+int rpc_clnt_iterate_for_each_xprt(struct rpc_clnt *clnt,
+ int (*fn)(struct rpc_clnt *, struct rpc_xprt *, void *),
+ void *data)
+{
+ struct rpc_xprt_iter xpi;
+ int ret;
+
+ ret = rpc_clnt_xprt_iter_init(clnt, &xpi);
+ if (ret)
+ return ret;
+ for (;;) {
+ struct rpc_xprt *xprt = xprt_iter_get_next(&xpi);
+
+ if (!xprt)
+ break;
+ ret = fn(clnt, xprt, data);
+ xprt_put(xprt);
+ if (ret < 0)
+ break;
+ }
+ xprt_iter_destroy(&xpi);
+ return ret;
+}
+EXPORT_SYMBOL_GPL(rpc_clnt_iterate_for_each_xprt);
+
/*
* Kill all tasks for the given client.
* XXX: kill their descendants as well?
@@ -783,6 +878,7 @@ rpc_free_client(struct rpc_clnt *clnt)
rpc_free_iostats(clnt->cl_metrics);
clnt->cl_metrics = NULL;
xprt_put(rcu_dereference_raw(clnt->cl_xprt));
+ xprt_iter_destroy(&clnt->cl_xpi);
rpciod_down();
rpc_free_clid(clnt);
kfree(clnt);
@@ -868,6 +964,7 @@ EXPORT_SYMBOL_GPL(rpc_bind_new_program);
void rpc_task_release_client(struct rpc_task *task)
{
struct rpc_clnt *clnt = task->tk_client;
+ struct rpc_xprt *xprt = task->tk_xprt;
if (clnt != NULL) {
/* Remove from client task list */
@@ -878,13 +975,22 @@ void rpc_task_release_client(struct rpc_task *task)
rpc_release_client(clnt);
}
+
+ if (xprt != NULL) {
+ task->tk_xprt = NULL;
+
+ xprt_put(xprt);
+ }
}
static
void rpc_task_set_client(struct rpc_task *task, struct rpc_clnt *clnt)
{
+
if (clnt != NULL) {
rpc_task_release_client(task);
+ if (task->tk_xprt == NULL)
+ task->tk_xprt = xprt_iter_get_next(&clnt->cl_xpi);
task->tk_client = clnt;
atomic_inc(&clnt->cl_count);
if (clnt->cl_softrtry)
@@ -900,14 +1006,6 @@ void rpc_task_set_client(struct rpc_task *task, struct rpc_clnt *clnt)
}
}
-void rpc_task_reset_client(struct rpc_task *task, struct rpc_clnt *clnt)
-{
- rpc_task_release_client(task);
- rpc_task_set_client(task, clnt);
-}
-EXPORT_SYMBOL_GPL(rpc_task_reset_client);
-
-
static void
rpc_task_set_rpc_message(struct rpc_task *task, const struct rpc_message *msg)
{
@@ -1335,6 +1433,23 @@ size_t rpc_max_payload(struct rpc_clnt *clnt)
EXPORT_SYMBOL_GPL(rpc_max_payload);
/**
+ * rpc_max_bc_payload - Get maximum backchannel payload size, in bytes
+ * @clnt: RPC client to query
+ */
+size_t rpc_max_bc_payload(struct rpc_clnt *clnt)
+{
+ struct rpc_xprt *xprt;
+ size_t ret;
+
+ rcu_read_lock();
+ xprt = rcu_dereference(clnt->cl_xprt);
+ ret = xprt->ops->bc_maxpayload(xprt);
+ rcu_read_unlock();
+ return ret;
+}
+EXPORT_SYMBOL_GPL(rpc_max_bc_payload);
+
+/**
* rpc_get_timeout - Get timeout for transport in units of HZ
* @clnt: RPC client to query
*/
@@ -2104,11 +2219,9 @@ call_timeout(struct rpc_task *task)
}
if (RPC_IS_SOFT(task)) {
if (clnt->cl_chatty) {
- rcu_read_lock();
printk(KERN_NOTICE "%s: server %s not responding, timed out\n",
clnt->cl_program->name,
- rcu_dereference(clnt->cl_xprt)->servername);
- rcu_read_unlock();
+ task->tk_xprt->servername);
}
if (task->tk_flags & RPC_TASK_TIMEOUT)
rpc_exit(task, -ETIMEDOUT);
@@ -2120,11 +2233,9 @@ call_timeout(struct rpc_task *task)
if (!(task->tk_flags & RPC_CALL_MAJORSEEN)) {
task->tk_flags |= RPC_CALL_MAJORSEEN;
if (clnt->cl_chatty) {
- rcu_read_lock();
printk(KERN_NOTICE "%s: server %s not responding, still trying\n",
clnt->cl_program->name,
- rcu_dereference(clnt->cl_xprt)->servername);
- rcu_read_unlock();
+ task->tk_xprt->servername);
}
}
rpc_force_rebind(clnt);
@@ -2154,11 +2265,9 @@ call_decode(struct rpc_task *task)
if (task->tk_flags & RPC_CALL_MAJORSEEN) {
if (clnt->cl_chatty) {
- rcu_read_lock();
printk(KERN_NOTICE "%s: server %s OK\n",
clnt->cl_program->name,
- rcu_dereference(clnt->cl_xprt)->servername);
- rcu_read_unlock();
+ task->tk_xprt->servername);
}
task->tk_flags &= ~RPC_CALL_MAJORSEEN;
}
@@ -2312,11 +2421,9 @@ rpc_verify_header(struct rpc_task *task)
task->tk_action = call_bind;
goto out_retry;
case RPC_AUTH_TOOWEAK:
- rcu_read_lock();
printk(KERN_NOTICE "RPC: server %s requires stronger "
"authentication.\n",
- rcu_dereference(clnt->cl_xprt)->servername);
- rcu_read_unlock();
+ task->tk_xprt->servername);
break;
default:
dprintk("RPC: %5u %s: unknown auth error: %x\n",
@@ -2341,27 +2448,27 @@ rpc_verify_header(struct rpc_task *task)
case RPC_SUCCESS:
return p;
case RPC_PROG_UNAVAIL:
- dprintk_rcu("RPC: %5u %s: program %u is unsupported "
+ dprintk("RPC: %5u %s: program %u is unsupported "
"by server %s\n", task->tk_pid, __func__,
(unsigned int)clnt->cl_prog,
- rcu_dereference(clnt->cl_xprt)->servername);
+ task->tk_xprt->servername);
error = -EPFNOSUPPORT;
goto out_err;
case RPC_PROG_MISMATCH:
- dprintk_rcu("RPC: %5u %s: program %u, version %u unsupported "
+ dprintk("RPC: %5u %s: program %u, version %u unsupported "
"by server %s\n", task->tk_pid, __func__,
(unsigned int)clnt->cl_prog,
(unsigned int)clnt->cl_vers,
- rcu_dereference(clnt->cl_xprt)->servername);
+ task->tk_xprt->servername);
error = -EPROTONOSUPPORT;
goto out_err;
case RPC_PROC_UNAVAIL:
- dprintk_rcu("RPC: %5u %s: proc %s unsupported by program %u, "
+ dprintk("RPC: %5u %s: proc %s unsupported by program %u, "
"version %u on server %s\n",
task->tk_pid, __func__,
rpc_proc_name(task),
clnt->cl_prog, clnt->cl_vers,
- rcu_dereference(clnt->cl_xprt)->servername);
+ task->tk_xprt->servername);
error = -EOPNOTSUPP;
goto out_err;
case RPC_GARBAGE_ARGS:
@@ -2421,7 +2528,10 @@ static int rpc_ping(struct rpc_clnt *clnt)
return err;
}
-struct rpc_task *rpc_call_null(struct rpc_clnt *clnt, struct rpc_cred *cred, int flags)
+static
+struct rpc_task *rpc_call_null_helper(struct rpc_clnt *clnt,
+ struct rpc_xprt *xprt, struct rpc_cred *cred, int flags,
+ const struct rpc_call_ops *ops, void *data)
{
struct rpc_message msg = {
.rpc_proc = &rpcproc_null,
@@ -2429,14 +2539,140 @@ struct rpc_task *rpc_call_null(struct rpc_clnt *clnt, struct rpc_cred *cred, int
};
struct rpc_task_setup task_setup_data = {
.rpc_client = clnt,
+ .rpc_xprt = xprt,
.rpc_message = &msg,
- .callback_ops = &rpc_default_ops,
+ .callback_ops = (ops != NULL) ? ops : &rpc_default_ops,
+ .callback_data = data,
.flags = flags,
};
+
return rpc_run_task(&task_setup_data);
}
+
+struct rpc_task *rpc_call_null(struct rpc_clnt *clnt, struct rpc_cred *cred, int flags)
+{
+ return rpc_call_null_helper(clnt, NULL, cred, flags, NULL, NULL);
+}
EXPORT_SYMBOL_GPL(rpc_call_null);
+struct rpc_cb_add_xprt_calldata {
+ struct rpc_xprt_switch *xps;
+ struct rpc_xprt *xprt;
+};
+
+static void rpc_cb_add_xprt_done(struct rpc_task *task, void *calldata)
+{
+ struct rpc_cb_add_xprt_calldata *data = calldata;
+
+ if (task->tk_status == 0)
+ rpc_xprt_switch_add_xprt(data->xps, data->xprt);
+}
+
+static void rpc_cb_add_xprt_release(void *calldata)
+{
+ struct rpc_cb_add_xprt_calldata *data = calldata;
+
+ xprt_put(data->xprt);
+ xprt_switch_put(data->xps);
+ kfree(data);
+}
+
+const static struct rpc_call_ops rpc_cb_add_xprt_call_ops = {
+ .rpc_call_done = rpc_cb_add_xprt_done,
+ .rpc_release = rpc_cb_add_xprt_release,
+};
+
+/**
+ * rpc_clnt_test_and_add_xprt - Test and add a new transport to a rpc_clnt
+ * @clnt: pointer to struct rpc_clnt
+ * @xps: pointer to struct rpc_xprt_switch,
+ * @xprt: pointer struct rpc_xprt
+ * @dummy: unused
+ */
+int rpc_clnt_test_and_add_xprt(struct rpc_clnt *clnt,
+ struct rpc_xprt_switch *xps, struct rpc_xprt *xprt,
+ void *dummy)
+{
+ struct rpc_cb_add_xprt_calldata *data;
+ struct rpc_cred *cred;
+ struct rpc_task *task;
+
+ data = kmalloc(sizeof(*data), GFP_NOFS);
+ if (!data)
+ return -ENOMEM;
+ data->xps = xprt_switch_get(xps);
+ data->xprt = xprt_get(xprt);
+
+ cred = authnull_ops.lookup_cred(NULL, NULL, 0);
+ task = rpc_call_null_helper(clnt, xprt, cred,
+ RPC_TASK_SOFT|RPC_TASK_SOFTCONN|RPC_TASK_ASYNC,
+ &rpc_cb_add_xprt_call_ops, data);
+ put_rpccred(cred);
+ if (IS_ERR(task))
+ return PTR_ERR(task);
+ rpc_put_task(task);
+ return 1;
+}
+EXPORT_SYMBOL_GPL(rpc_clnt_test_and_add_xprt);
+
+/**
+ * rpc_clnt_add_xprt - Add a new transport to a rpc_clnt
+ * @clnt: pointer to struct rpc_clnt
+ * @xprtargs: pointer to struct xprt_create
+ * @setup: callback to test and/or set up the connection
+ * @data: pointer to setup function data
+ *
+ * Creates a new transport using the parameters set in args and
+ * adds it to clnt.
+ * If ping is set, then test that connectivity succeeds before
+ * adding the new transport.
+ *
+ */
+int rpc_clnt_add_xprt(struct rpc_clnt *clnt,
+ struct xprt_create *xprtargs,
+ int (*setup)(struct rpc_clnt *,
+ struct rpc_xprt_switch *,
+ struct rpc_xprt *,
+ void *),
+ void *data)
+{
+ struct rpc_xprt_switch *xps;
+ struct rpc_xprt *xprt;
+ unsigned char resvport;
+ int ret = 0;
+
+ rcu_read_lock();
+ xps = xprt_switch_get(rcu_dereference(clnt->cl_xpi.xpi_xpswitch));
+ xprt = xprt_iter_xprt(&clnt->cl_xpi);
+ if (xps == NULL || xprt == NULL) {
+ rcu_read_unlock();
+ return -EAGAIN;
+ }
+ resvport = xprt->resvport;
+ rcu_read_unlock();
+
+ xprt = xprt_create_transport(xprtargs);
+ if (IS_ERR(xprt)) {
+ ret = PTR_ERR(xprt);
+ goto out_put_switch;
+ }
+ xprt->resvport = resvport;
+
+ rpc_xprt_switch_set_roundrobin(xps);
+ if (setup) {
+ ret = setup(clnt, xps, xprt, data);
+ if (ret != 0)
+ goto out_put_xprt;
+ }
+ rpc_xprt_switch_add_xprt(xps, xprt);
+out_put_xprt:
+ xprt_put(xprt);
+out_put_switch:
+ xprt_switch_put(xps);
+ return ret;
+}
+EXPORT_SYMBOL_GPL(rpc_clnt_add_xprt);
+
#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
static void rpc_show_header(void)
{
@@ -2483,57 +2719,39 @@ void rpc_show_tasks(struct net *net)
#endif
#if IS_ENABLED(CONFIG_SUNRPC_SWAP)
+static int
+rpc_clnt_swap_activate_callback(struct rpc_clnt *clnt,
+ struct rpc_xprt *xprt,
+ void *dummy)
+{
+ return xprt_enable_swap(xprt);
+}
+
int
rpc_clnt_swap_activate(struct rpc_clnt *clnt)
{
- int ret = 0;
- struct rpc_xprt *xprt;
-
- if (atomic_inc_return(&clnt->cl_swapper) == 1) {
-retry:
- rcu_read_lock();
- xprt = xprt_get(rcu_dereference(clnt->cl_xprt));
- rcu_read_unlock();
- if (!xprt) {
- /*
- * If we didn't get a reference, then we likely are
- * racing with a migration event. Wait for a grace
- * period and try again.
- */
- synchronize_rcu();
- goto retry;
- }
-
- ret = xprt_enable_swap(xprt);
- xprt_put(xprt);
- }
- return ret;
+ if (atomic_inc_return(&clnt->cl_swapper) == 1)
+ return rpc_clnt_iterate_for_each_xprt(clnt,
+ rpc_clnt_swap_activate_callback, NULL);
+ return 0;
}
EXPORT_SYMBOL_GPL(rpc_clnt_swap_activate);
+static int
+rpc_clnt_swap_deactivate_callback(struct rpc_clnt *clnt,
+ struct rpc_xprt *xprt,
+ void *dummy)
+{
+ xprt_disable_swap(xprt);
+ return 0;
+}
+
void
rpc_clnt_swap_deactivate(struct rpc_clnt *clnt)
{
- struct rpc_xprt *xprt;
-
- if (atomic_dec_if_positive(&clnt->cl_swapper) == 0) {
-retry:
- rcu_read_lock();
- xprt = xprt_get(rcu_dereference(clnt->cl_xprt));
- rcu_read_unlock();
- if (!xprt) {
- /*
- * If we didn't get a reference, then we likely are
- * racing with a migration event. Wait for a grace
- * period and try again.
- */
- synchronize_rcu();
- goto retry;
- }
-
- xprt_disable_swap(xprt);
- xprt_put(xprt);
- }
+ if (atomic_dec_if_positive(&clnt->cl_swapper) == 0)
+ rpc_clnt_iterate_for_each_xprt(clnt,
+ rpc_clnt_swap_deactivate_callback, NULL);
}
EXPORT_SYMBOL_GPL(rpc_clnt_swap_deactivate);
#endif /* CONFIG_SUNRPC_SWAP */
diff --git a/net/sunrpc/rpc_pipe.c b/net/sunrpc/rpc_pipe.c
index 31789ef3e614..fc48eca21fd2 100644
--- a/net/sunrpc/rpc_pipe.c
+++ b/net/sunrpc/rpc_pipe.c
@@ -1390,8 +1390,8 @@ rpc_fill_super(struct super_block *sb, void *data, int silent)
struct sunrpc_net *sn = net_generic(net, sunrpc_net_id);
int err;
- sb->s_blocksize = PAGE_CACHE_SIZE;
- sb->s_blocksize_bits = PAGE_CACHE_SHIFT;
+ sb->s_blocksize = PAGE_SIZE;
+ sb->s_blocksize_bits = PAGE_SHIFT;
sb->s_magic = RPCAUTH_GSSMAGIC;
sb->s_op = &s_ops;
sb->s_d_op = &simple_dentry_operations;
diff --git a/net/sunrpc/rpcb_clnt.c b/net/sunrpc/rpcb_clnt.c
index cf5770d8f49a..5b30603596d0 100644
--- a/net/sunrpc/rpcb_clnt.c
+++ b/net/sunrpc/rpcb_clnt.c
@@ -648,10 +648,10 @@ static struct rpc_task *rpcb_call_async(struct rpc_clnt *rpcb_clnt, struct rpcbi
static struct rpc_clnt *rpcb_find_transport_owner(struct rpc_clnt *clnt)
{
struct rpc_clnt *parent = clnt->cl_parent;
- struct rpc_xprt *xprt = rcu_dereference(clnt->cl_xprt);
+ struct rpc_xprt_switch *xps = rcu_access_pointer(clnt->cl_xpi.xpi_xpswitch);
while (parent != clnt) {
- if (rcu_dereference(parent->cl_xprt) != xprt)
+ if (rcu_access_pointer(parent->cl_xpi.xpi_xpswitch) != xps)
break;
if (clnt->cl_autobind)
break;
@@ -683,11 +683,9 @@ void rpcb_getport_async(struct rpc_task *task)
int status;
rcu_read_lock();
- do {
- clnt = rpcb_find_transport_owner(task->tk_client);
- xprt = xprt_get(rcu_dereference(clnt->cl_xprt));
- } while (xprt == NULL);
+ clnt = rpcb_find_transport_owner(task->tk_client);
rcu_read_unlock();
+ xprt = xprt_get(task->tk_xprt);
dprintk("RPC: %5u %s(%s, %u, %u, %d)\n",
task->tk_pid, __func__,
diff --git a/net/sunrpc/sched.c b/net/sunrpc/sched.c
index 73ad57a59989..fcfd48d263f6 100644
--- a/net/sunrpc/sched.c
+++ b/net/sunrpc/sched.c
@@ -909,6 +909,8 @@ static void rpc_init_task(struct rpc_task *task, const struct rpc_task_setup *ta
/* Initialize workqueue for async tasks */
task->tk_workqueue = task_setup_data->workqueue;
+ task->tk_xprt = xprt_get(task_setup_data->rpc_xprt);
+
if (task->tk_ops->rpc_call_prepare != NULL)
task->tk_action = rpc_prepare_task;
diff --git a/net/sunrpc/socklib.c b/net/sunrpc/socklib.c
index 2df87f78e518..f217c348b341 100644
--- a/net/sunrpc/socklib.c
+++ b/net/sunrpc/socklib.c
@@ -96,8 +96,8 @@ ssize_t xdr_partial_copy_from_skb(struct xdr_buf *xdr, unsigned int base, struct
if (base || xdr->page_base) {
pglen -= base;
base += xdr->page_base;
- ppage += base >> PAGE_CACHE_SHIFT;
- base &= ~PAGE_CACHE_MASK;
+ ppage += base >> PAGE_SHIFT;
+ base &= ~PAGE_MASK;
}
do {
char *kaddr;
@@ -113,7 +113,7 @@ ssize_t xdr_partial_copy_from_skb(struct xdr_buf *xdr, unsigned int base, struct
}
}
- len = PAGE_CACHE_SIZE;
+ len = PAGE_SIZE;
kaddr = kmap_atomic(*ppage);
if (base) {
len -= base;
@@ -155,7 +155,7 @@ int csum_partial_copy_to_xdr(struct xdr_buf *xdr, struct sk_buff *skb)
struct xdr_skb_reader desc;
desc.skb = skb;
- desc.offset = sizeof(struct udphdr);
+ desc.offset = 0;
desc.count = skb->len - desc.offset;
if (skb_csum_unnecessary(skb))
diff --git a/net/sunrpc/svc_xprt.c b/net/sunrpc/svc_xprt.c
index 7422f28818b2..4f01f63102ee 100644
--- a/net/sunrpc/svc_xprt.c
+++ b/net/sunrpc/svc_xprt.c
@@ -136,6 +136,8 @@ static void svc_xprt_free(struct kref *kref)
/* See comment on corresponding get in xs_setup_bc_tcp(): */
if (xprt->xpt_bc_xprt)
xprt_put(xprt->xpt_bc_xprt);
+ if (xprt->xpt_bc_xps)
+ xprt_switch_put(xprt->xpt_bc_xps);
xprt->xpt_ops->xpo_free(xprt);
module_put(owner);
}
@@ -244,13 +246,12 @@ void svc_add_new_perm_xprt(struct svc_serv *serv, struct svc_xprt *new)
svc_xprt_received(new);
}
-int svc_create_xprt(struct svc_serv *serv, const char *xprt_name,
+int _svc_create_xprt(struct svc_serv *serv, const char *xprt_name,
struct net *net, const int family,
const unsigned short port, int flags)
{
struct svc_xprt_class *xcl;
- dprintk("svc: creating transport %s[%d]\n", xprt_name, port);
spin_lock(&svc_xprt_class_lock);
list_for_each_entry(xcl, &svc_xprt_class_list, xcl_list) {
struct svc_xprt *newxprt;
@@ -274,12 +275,28 @@ int svc_create_xprt(struct svc_serv *serv, const char *xprt_name,
}
err:
spin_unlock(&svc_xprt_class_lock);
- dprintk("svc: transport %s not found\n", xprt_name);
-
/* This errno is exposed to user space. Provide a reasonable
* perror msg for a bad transport. */
return -EPROTONOSUPPORT;
}
+
+int svc_create_xprt(struct svc_serv *serv, const char *xprt_name,
+ struct net *net, const int family,
+ const unsigned short port, int flags)
+{
+ int err;
+
+ dprintk("svc: creating transport %s[%d]\n", xprt_name, port);
+ err = _svc_create_xprt(serv, xprt_name, net, family, port, flags);
+ if (err == -EPROTONOSUPPORT) {
+ request_module("svc%s", xprt_name);
+ err = _svc_create_xprt(serv, xprt_name, net, family, port, flags);
+ }
+ if (err)
+ dprintk("svc: transport %s not found, err %d\n",
+ xprt_name, err);
+ return err;
+}
EXPORT_SYMBOL_GPL(svc_create_xprt);
/*
diff --git a/net/sunrpc/svcsock.c b/net/sunrpc/svcsock.c
index 1413cdcc131c..dadfec66dbd8 100644
--- a/net/sunrpc/svcsock.c
+++ b/net/sunrpc/svcsock.c
@@ -85,8 +85,7 @@ static void svc_reclassify_socket(struct socket *sock)
{
struct sock *sk = sock->sk;
- WARN_ON_ONCE(sock_owned_by_user(sk));
- if (sock_owned_by_user(sk))
+ if (WARN_ON_ONCE(!sock_allow_reclassification(sk)))
return;
switch (sk->sk_family) {
@@ -617,7 +616,7 @@ static int svc_udp_recvfrom(struct svc_rqst *rqstp)
svsk->sk_sk->sk_stamp = skb->tstamp;
set_bit(XPT_DATA, &svsk->sk_xprt.xpt_flags); /* there may be more data... */
- len = skb->len - sizeof(struct udphdr);
+ len = skb->len;
rqstp->rq_arg.len = len;
rqstp->rq_prot = IPPROTO_UDP;
@@ -641,8 +640,7 @@ static int svc_udp_recvfrom(struct svc_rqst *rqstp)
skb_free_datagram_locked(svsk->sk_sk, skb);
} else {
/* we can use it in-place */
- rqstp->rq_arg.head[0].iov_base = skb->data +
- sizeof(struct udphdr);
+ rqstp->rq_arg.head[0].iov_base = skb->data;
rqstp->rq_arg.head[0].iov_len = len;
if (skb_checksum_complete(skb))
goto out_free;
diff --git a/net/sunrpc/xdr.c b/net/sunrpc/xdr.c
index 4439ac4c1b53..c4f3cc0c0775 100644
--- a/net/sunrpc/xdr.c
+++ b/net/sunrpc/xdr.c
@@ -164,7 +164,7 @@ EXPORT_SYMBOL_GPL(xdr_inline_pages);
* Note: the addresses pgto_base and pgfrom_base are both calculated in
* the same way:
* if a memory area starts at byte 'base' in page 'pages[i]',
- * then its address is given as (i << PAGE_CACHE_SHIFT) + base
+ * then its address is given as (i << PAGE_SHIFT) + base
* Also note: pgfrom_base must be < pgto_base, but the memory areas
* they point to may overlap.
*/
@@ -181,20 +181,20 @@ _shift_data_right_pages(struct page **pages, size_t pgto_base,
pgto_base += len;
pgfrom_base += len;
- pgto = pages + (pgto_base >> PAGE_CACHE_SHIFT);
- pgfrom = pages + (pgfrom_base >> PAGE_CACHE_SHIFT);
+ pgto = pages + (pgto_base >> PAGE_SHIFT);
+ pgfrom = pages + (pgfrom_base >> PAGE_SHIFT);
- pgto_base &= ~PAGE_CACHE_MASK;
- pgfrom_base &= ~PAGE_CACHE_MASK;
+ pgto_base &= ~PAGE_MASK;
+ pgfrom_base &= ~PAGE_MASK;
do {
/* Are any pointers crossing a page boundary? */
if (pgto_base == 0) {
- pgto_base = PAGE_CACHE_SIZE;
+ pgto_base = PAGE_SIZE;
pgto--;
}
if (pgfrom_base == 0) {
- pgfrom_base = PAGE_CACHE_SIZE;
+ pgfrom_base = PAGE_SIZE;
pgfrom--;
}
@@ -236,11 +236,11 @@ _copy_to_pages(struct page **pages, size_t pgbase, const char *p, size_t len)
char *vto;
size_t copy;
- pgto = pages + (pgbase >> PAGE_CACHE_SHIFT);
- pgbase &= ~PAGE_CACHE_MASK;
+ pgto = pages + (pgbase >> PAGE_SHIFT);
+ pgbase &= ~PAGE_MASK;
for (;;) {
- copy = PAGE_CACHE_SIZE - pgbase;
+ copy = PAGE_SIZE - pgbase;
if (copy > len)
copy = len;
@@ -253,7 +253,7 @@ _copy_to_pages(struct page **pages, size_t pgbase, const char *p, size_t len)
break;
pgbase += copy;
- if (pgbase == PAGE_CACHE_SIZE) {
+ if (pgbase == PAGE_SIZE) {
flush_dcache_page(*pgto);
pgbase = 0;
pgto++;
@@ -280,11 +280,11 @@ _copy_from_pages(char *p, struct page **pages, size_t pgbase, size_t len)
char *vfrom;
size_t copy;
- pgfrom = pages + (pgbase >> PAGE_CACHE_SHIFT);
- pgbase &= ~PAGE_CACHE_MASK;
+ pgfrom = pages + (pgbase >> PAGE_SHIFT);
+ pgbase &= ~PAGE_MASK;
do {
- copy = PAGE_CACHE_SIZE - pgbase;
+ copy = PAGE_SIZE - pgbase;
if (copy > len)
copy = len;
@@ -293,7 +293,7 @@ _copy_from_pages(char *p, struct page **pages, size_t pgbase, size_t len)
kunmap_atomic(vfrom);
pgbase += copy;
- if (pgbase == PAGE_CACHE_SIZE) {
+ if (pgbase == PAGE_SIZE) {
pgbase = 0;
pgfrom++;
}
@@ -797,6 +797,8 @@ void xdr_init_decode(struct xdr_stream *xdr, struct xdr_buf *buf, __be32 *p)
xdr_set_iov(xdr, buf->head, buf->len);
else if (buf->page_len != 0)
xdr_set_page_base(xdr, 0, buf->len);
+ else
+ xdr_set_iov(xdr, buf->head, buf->len);
if (p != NULL && p > xdr->p && xdr->end >= p) {
xdr->nwords -= p - xdr->p;
xdr->p = p;
@@ -1038,8 +1040,8 @@ xdr_buf_subsegment(struct xdr_buf *buf, struct xdr_buf *subbuf,
if (base < buf->page_len) {
subbuf->page_len = min(buf->page_len - base, len);
base += buf->page_base;
- subbuf->page_base = base & ~PAGE_CACHE_MASK;
- subbuf->pages = &buf->pages[base >> PAGE_CACHE_SHIFT];
+ subbuf->page_base = base & ~PAGE_MASK;
+ subbuf->pages = &buf->pages[base >> PAGE_SHIFT];
len -= subbuf->page_len;
base = 0;
} else {
@@ -1297,9 +1299,9 @@ xdr_xcode_array2(struct xdr_buf *buf, unsigned int base,
todo -= avail_here;
base += buf->page_base;
- ppages = buf->pages + (base >> PAGE_CACHE_SHIFT);
- base &= ~PAGE_CACHE_MASK;
- avail_page = min_t(unsigned int, PAGE_CACHE_SIZE - base,
+ ppages = buf->pages + (base >> PAGE_SHIFT);
+ base &= ~PAGE_MASK;
+ avail_page = min_t(unsigned int, PAGE_SIZE - base,
avail_here);
c = kmap(*ppages) + base;
@@ -1383,7 +1385,7 @@ xdr_xcode_array2(struct xdr_buf *buf, unsigned int base,
}
avail_page = min(avail_here,
- (unsigned int) PAGE_CACHE_SIZE);
+ (unsigned int) PAGE_SIZE);
}
base = buf->page_len; /* align to start of tail */
}
@@ -1479,9 +1481,9 @@ xdr_process_buf(struct xdr_buf *buf, unsigned int offset, unsigned int len,
if (page_len > len)
page_len = len;
len -= page_len;
- page_offset = (offset + buf->page_base) & (PAGE_CACHE_SIZE - 1);
- i = (offset + buf->page_base) >> PAGE_CACHE_SHIFT;
- thislen = PAGE_CACHE_SIZE - page_offset;
+ page_offset = (offset + buf->page_base) & (PAGE_SIZE - 1);
+ i = (offset + buf->page_base) >> PAGE_SHIFT;
+ thislen = PAGE_SIZE - page_offset;
do {
if (thislen > page_len)
thislen = page_len;
@@ -1492,7 +1494,7 @@ xdr_process_buf(struct xdr_buf *buf, unsigned int offset, unsigned int len,
page_len -= thislen;
i++;
page_offset = 0;
- thislen = PAGE_CACHE_SIZE;
+ thislen = PAGE_SIZE;
} while (page_len != 0);
offset = 0;
}
diff --git a/net/sunrpc/xprt.c b/net/sunrpc/xprt.c
index 37edea6fa92d..216a1385718a 100644
--- a/net/sunrpc/xprt.c
+++ b/net/sunrpc/xprt.c
@@ -48,6 +48,7 @@
#include <linux/sunrpc/clnt.h>
#include <linux/sunrpc/metrics.h>
#include <linux/sunrpc/bc_xprt.h>
+#include <linux/rcupdate.h>
#include <trace/events/sunrpc.h>
@@ -1166,7 +1167,7 @@ void xprt_free(struct rpc_xprt *xprt)
{
put_net(xprt->xprt_net);
xprt_free_all_slots(xprt);
- kfree(xprt);
+ kfree_rcu(xprt, rcu);
}
EXPORT_SYMBOL_GPL(xprt_free);
@@ -1180,7 +1181,7 @@ EXPORT_SYMBOL_GPL(xprt_free);
*/
void xprt_reserve(struct rpc_task *task)
{
- struct rpc_xprt *xprt;
+ struct rpc_xprt *xprt = task->tk_xprt;
task->tk_status = 0;
if (task->tk_rqstp != NULL)
@@ -1188,11 +1189,8 @@ void xprt_reserve(struct rpc_task *task)
task->tk_timeout = 0;
task->tk_status = -EAGAIN;
- rcu_read_lock();
- xprt = rcu_dereference(task->tk_client->cl_xprt);
if (!xprt_throttle_congested(xprt, task))
xprt->ops->alloc_slot(xprt, task);
- rcu_read_unlock();
}
/**
@@ -1206,7 +1204,7 @@ void xprt_reserve(struct rpc_task *task)
*/
void xprt_retry_reserve(struct rpc_task *task)
{
- struct rpc_xprt *xprt;
+ struct rpc_xprt *xprt = task->tk_xprt;
task->tk_status = 0;
if (task->tk_rqstp != NULL)
@@ -1214,10 +1212,7 @@ void xprt_retry_reserve(struct rpc_task *task)
task->tk_timeout = 0;
task->tk_status = -EAGAIN;
- rcu_read_lock();
- xprt = rcu_dereference(task->tk_client->cl_xprt);
xprt->ops->alloc_slot(xprt, task);
- rcu_read_unlock();
}
static inline __be32 xprt_alloc_xid(struct rpc_xprt *xprt)
@@ -1264,11 +1259,9 @@ void xprt_release(struct rpc_task *task)
if (req == NULL) {
if (task->tk_client) {
- rcu_read_lock();
- xprt = rcu_dereference(task->tk_client->cl_xprt);
+ xprt = task->tk_xprt;
if (xprt->snd_task == task)
xprt_release_write(xprt, task);
- rcu_read_unlock();
}
return;
}
@@ -1307,7 +1300,7 @@ void xprt_release(struct rpc_task *task)
static void xprt_init(struct rpc_xprt *xprt, struct net *net)
{
- atomic_set(&xprt->count, 1);
+ kref_init(&xprt->kref);
spin_lock_init(&xprt->transport_lock);
spin_lock_init(&xprt->reserve_lock);
@@ -1318,6 +1311,7 @@ static void xprt_init(struct rpc_xprt *xprt, struct net *net)
spin_lock_init(&xprt->bc_pa_lock);
INIT_LIST_HEAD(&xprt->bc_pa_list);
#endif /* CONFIG_SUNRPC_BACKCHANNEL */
+ INIT_LIST_HEAD(&xprt->xprt_switch);
xprt->last_used = jiffies;
xprt->cwnd = RPC_INITCWND;
@@ -1415,6 +1409,24 @@ static void xprt_destroy(struct rpc_xprt *xprt)
xprt->ops->destroy(xprt);
}
+static void xprt_destroy_kref(struct kref *kref)
+{
+ xprt_destroy(container_of(kref, struct rpc_xprt, kref));
+}
+
+/**
+ * xprt_get - return a reference to an RPC transport.
+ * @xprt: pointer to the transport
+ *
+ */
+struct rpc_xprt *xprt_get(struct rpc_xprt *xprt)
+{
+ if (xprt != NULL && kref_get_unless_zero(&xprt->kref))
+ return xprt;
+ return NULL;
+}
+EXPORT_SYMBOL_GPL(xprt_get);
+
/**
* xprt_put - release a reference to an RPC transport.
* @xprt: pointer to the transport
@@ -1422,7 +1434,7 @@ static void xprt_destroy(struct rpc_xprt *xprt)
*/
void xprt_put(struct rpc_xprt *xprt)
{
- if (atomic_dec_and_test(&xprt->count))
- xprt_destroy(xprt);
+ if (xprt != NULL)
+ kref_put(&xprt->kref, xprt_destroy_kref);
}
EXPORT_SYMBOL_GPL(xprt_put);
diff --git a/net/sunrpc/xprtmultipath.c b/net/sunrpc/xprtmultipath.c
new file mode 100644
index 000000000000..e7fd76975d86
--- /dev/null
+++ b/net/sunrpc/xprtmultipath.c
@@ -0,0 +1,475 @@
+/*
+ * Multipath support for RPC
+ *
+ * Copyright (c) 2015, 2016, Primary Data, Inc. All rights reserved.
+ *
+ * Trond Myklebust <trond.myklebust@primarydata.com>
+ *
+ */
+#include <linux/types.h>
+#include <linux/kref.h>
+#include <linux/list.h>
+#include <linux/rcupdate.h>
+#include <linux/rculist.h>
+#include <linux/slab.h>
+#include <asm/cmpxchg.h>
+#include <linux/spinlock.h>
+#include <linux/sunrpc/xprt.h>
+#include <linux/sunrpc/xprtmultipath.h>
+
+typedef struct rpc_xprt *(*xprt_switch_find_xprt_t)(struct list_head *head,
+ const struct rpc_xprt *cur);
+
+static const struct rpc_xprt_iter_ops rpc_xprt_iter_singular;
+static const struct rpc_xprt_iter_ops rpc_xprt_iter_roundrobin;
+static const struct rpc_xprt_iter_ops rpc_xprt_iter_listall;
+
+static void xprt_switch_add_xprt_locked(struct rpc_xprt_switch *xps,
+ struct rpc_xprt *xprt)
+{
+ if (unlikely(xprt_get(xprt) == NULL))
+ return;
+ list_add_tail_rcu(&xprt->xprt_switch, &xps->xps_xprt_list);
+ smp_wmb();
+ if (xps->xps_nxprts == 0)
+ xps->xps_net = xprt->xprt_net;
+ xps->xps_nxprts++;
+}
+
+/**
+ * rpc_xprt_switch_add_xprt - Add a new rpc_xprt to an rpc_xprt_switch
+ * @xps: pointer to struct rpc_xprt_switch
+ * @xprt: pointer to struct rpc_xprt
+ *
+ * Adds xprt to the end of the list of struct rpc_xprt in xps.
+ */
+void rpc_xprt_switch_add_xprt(struct rpc_xprt_switch *xps,
+ struct rpc_xprt *xprt)
+{
+ if (xprt == NULL)
+ return;
+ spin_lock(&xps->xps_lock);
+ if (xps->xps_net == xprt->xprt_net || xps->xps_net == NULL)
+ xprt_switch_add_xprt_locked(xps, xprt);
+ spin_unlock(&xps->xps_lock);
+}
+
+static void xprt_switch_remove_xprt_locked(struct rpc_xprt_switch *xps,
+ struct rpc_xprt *xprt)
+{
+ if (unlikely(xprt == NULL))
+ return;
+ xps->xps_nxprts--;
+ if (xps->xps_nxprts == 0)
+ xps->xps_net = NULL;
+ smp_wmb();
+ list_del_rcu(&xprt->xprt_switch);
+}
+
+/**
+ * rpc_xprt_switch_remove_xprt - Removes an rpc_xprt from a rpc_xprt_switch
+ * @xps: pointer to struct rpc_xprt_switch
+ * @xprt: pointer to struct rpc_xprt
+ *
+ * Removes xprt from the list of struct rpc_xprt in xps.
+ */
+void rpc_xprt_switch_remove_xprt(struct rpc_xprt_switch *xps,
+ struct rpc_xprt *xprt)
+{
+ spin_lock(&xps->xps_lock);
+ xprt_switch_remove_xprt_locked(xps, xprt);
+ spin_unlock(&xps->xps_lock);
+ xprt_put(xprt);
+}
+
+/**
+ * xprt_switch_alloc - Allocate a new struct rpc_xprt_switch
+ * @xprt: pointer to struct rpc_xprt
+ * @gfp_flags: allocation flags
+ *
+ * On success, returns an initialised struct rpc_xprt_switch, containing
+ * the entry xprt. Returns NULL on failure.
+ */
+struct rpc_xprt_switch *xprt_switch_alloc(struct rpc_xprt *xprt,
+ gfp_t gfp_flags)
+{
+ struct rpc_xprt_switch *xps;
+
+ xps = kmalloc(sizeof(*xps), gfp_flags);
+ if (xps != NULL) {
+ spin_lock_init(&xps->xps_lock);
+ kref_init(&xps->xps_kref);
+ xps->xps_nxprts = 0;
+ INIT_LIST_HEAD(&xps->xps_xprt_list);
+ xps->xps_iter_ops = &rpc_xprt_iter_singular;
+ xprt_switch_add_xprt_locked(xps, xprt);
+ }
+
+ return xps;
+}
+
+static void xprt_switch_free_entries(struct rpc_xprt_switch *xps)
+{
+ spin_lock(&xps->xps_lock);
+ while (!list_empty(&xps->xps_xprt_list)) {
+ struct rpc_xprt *xprt;
+
+ xprt = list_first_entry(&xps->xps_xprt_list,
+ struct rpc_xprt, xprt_switch);
+ xprt_switch_remove_xprt_locked(xps, xprt);
+ spin_unlock(&xps->xps_lock);
+ xprt_put(xprt);
+ spin_lock(&xps->xps_lock);
+ }
+ spin_unlock(&xps->xps_lock);
+}
+
+static void xprt_switch_free(struct kref *kref)
+{
+ struct rpc_xprt_switch *xps = container_of(kref,
+ struct rpc_xprt_switch, xps_kref);
+
+ xprt_switch_free_entries(xps);
+ kfree_rcu(xps, xps_rcu);
+}
+
+/**
+ * xprt_switch_get - Return a reference to a rpc_xprt_switch
+ * @xps: pointer to struct rpc_xprt_switch
+ *
+ * Returns a reference to xps unless the refcount is already zero.
+ */
+struct rpc_xprt_switch *xprt_switch_get(struct rpc_xprt_switch *xps)
+{
+ if (xps != NULL && kref_get_unless_zero(&xps->xps_kref))
+ return xps;
+ return NULL;
+}
+
+/**
+ * xprt_switch_put - Release a reference to a rpc_xprt_switch
+ * @xps: pointer to struct rpc_xprt_switch
+ *
+ * Release the reference to xps, and free it once the refcount is zero.
+ */
+void xprt_switch_put(struct rpc_xprt_switch *xps)
+{
+ if (xps != NULL)
+ kref_put(&xps->xps_kref, xprt_switch_free);
+}
+
+/**
+ * rpc_xprt_switch_set_roundrobin - Set a round-robin policy on rpc_xprt_switch
+ * @xps: pointer to struct rpc_xprt_switch
+ *
+ * Sets a round-robin default policy for iterators acting on xps.
+ */
+void rpc_xprt_switch_set_roundrobin(struct rpc_xprt_switch *xps)
+{
+ if (READ_ONCE(xps->xps_iter_ops) != &rpc_xprt_iter_roundrobin)
+ WRITE_ONCE(xps->xps_iter_ops, &rpc_xprt_iter_roundrobin);
+}
+
+static
+const struct rpc_xprt_iter_ops *xprt_iter_ops(const struct rpc_xprt_iter *xpi)
+{
+ if (xpi->xpi_ops != NULL)
+ return xpi->xpi_ops;
+ return rcu_dereference(xpi->xpi_xpswitch)->xps_iter_ops;
+}
+
+static
+void xprt_iter_no_rewind(struct rpc_xprt_iter *xpi)
+{
+}
+
+static
+void xprt_iter_default_rewind(struct rpc_xprt_iter *xpi)
+{
+ WRITE_ONCE(xpi->xpi_cursor, NULL);
+}
+
+static
+struct rpc_xprt *xprt_switch_find_first_entry(struct list_head *head)
+{
+ return list_first_or_null_rcu(head, struct rpc_xprt, xprt_switch);
+}
+
+static
+struct rpc_xprt *xprt_iter_first_entry(struct rpc_xprt_iter *xpi)
+{
+ struct rpc_xprt_switch *xps = rcu_dereference(xpi->xpi_xpswitch);
+
+ if (xps == NULL)
+ return NULL;
+ return xprt_switch_find_first_entry(&xps->xps_xprt_list);
+}
+
+static
+struct rpc_xprt *xprt_switch_find_current_entry(struct list_head *head,
+ const struct rpc_xprt *cur)
+{
+ struct rpc_xprt *pos;
+
+ list_for_each_entry_rcu(pos, head, xprt_switch) {
+ if (cur == pos)
+ return pos;
+ }
+ return NULL;
+}
+
+static
+struct rpc_xprt *xprt_iter_current_entry(struct rpc_xprt_iter *xpi)
+{
+ struct rpc_xprt_switch *xps = rcu_dereference(xpi->xpi_xpswitch);
+ struct list_head *head;
+
+ if (xps == NULL)
+ return NULL;
+ head = &xps->xps_xprt_list;
+ if (xpi->xpi_cursor == NULL || xps->xps_nxprts < 2)
+ return xprt_switch_find_first_entry(head);
+ return xprt_switch_find_current_entry(head, xpi->xpi_cursor);
+}
+
+static
+struct rpc_xprt *xprt_switch_find_next_entry(struct list_head *head,
+ const struct rpc_xprt *cur)
+{
+ struct rpc_xprt *pos, *prev = NULL;
+
+ list_for_each_entry_rcu(pos, head, xprt_switch) {
+ if (cur == prev)
+ return pos;
+ prev = pos;
+ }
+ return NULL;
+}
+
+static
+struct rpc_xprt *xprt_switch_set_next_cursor(struct list_head *head,
+ struct rpc_xprt **cursor,
+ xprt_switch_find_xprt_t find_next)
+{
+ struct rpc_xprt *cur, *pos, *old;
+
+ cur = READ_ONCE(*cursor);
+ for (;;) {
+ old = cur;
+ pos = find_next(head, old);
+ if (pos == NULL)
+ break;
+ cur = cmpxchg_relaxed(cursor, old, pos);
+ if (cur == old)
+ break;
+ }
+ return pos;
+}
+
+static
+struct rpc_xprt *xprt_iter_next_entry_multiple(struct rpc_xprt_iter *xpi,
+ xprt_switch_find_xprt_t find_next)
+{
+ struct rpc_xprt_switch *xps = rcu_dereference(xpi->xpi_xpswitch);
+ struct list_head *head;
+
+ if (xps == NULL)
+ return NULL;
+ head = &xps->xps_xprt_list;
+ if (xps->xps_nxprts < 2)
+ return xprt_switch_find_first_entry(head);
+ return xprt_switch_set_next_cursor(head, &xpi->xpi_cursor, find_next);
+}
+
+static
+struct rpc_xprt *xprt_switch_find_next_entry_roundrobin(struct list_head *head,
+ const struct rpc_xprt *cur)
+{
+ struct rpc_xprt *ret;
+
+ ret = xprt_switch_find_next_entry(head, cur);
+ if (ret != NULL)
+ return ret;
+ return xprt_switch_find_first_entry(head);
+}
+
+static
+struct rpc_xprt *xprt_iter_next_entry_roundrobin(struct rpc_xprt_iter *xpi)
+{
+ return xprt_iter_next_entry_multiple(xpi,
+ xprt_switch_find_next_entry_roundrobin);
+}
+
+static
+struct rpc_xprt *xprt_iter_next_entry_all(struct rpc_xprt_iter *xpi)
+{
+ return xprt_iter_next_entry_multiple(xpi, xprt_switch_find_next_entry);
+}
+
+/*
+ * xprt_iter_rewind - Resets the xprt iterator
+ * @xpi: pointer to rpc_xprt_iter
+ *
+ * Resets xpi to ensure that it points to the first entry in the list
+ * of transports.
+ */
+static
+void xprt_iter_rewind(struct rpc_xprt_iter *xpi)
+{
+ rcu_read_lock();
+ xprt_iter_ops(xpi)->xpi_rewind(xpi);
+ rcu_read_unlock();
+}
+
+static void __xprt_iter_init(struct rpc_xprt_iter *xpi,
+ struct rpc_xprt_switch *xps,
+ const struct rpc_xprt_iter_ops *ops)
+{
+ rcu_assign_pointer(xpi->xpi_xpswitch, xprt_switch_get(xps));
+ xpi->xpi_cursor = NULL;
+ xpi->xpi_ops = ops;
+}
+
+/**
+ * xprt_iter_init - Initialise an xprt iterator
+ * @xpi: pointer to rpc_xprt_iter
+ * @xps: pointer to rpc_xprt_switch
+ *
+ * Initialises the iterator to use the default iterator ops
+ * as set in xps. This function is mainly intended for internal
+ * use in the rpc_client.
+ */
+void xprt_iter_init(struct rpc_xprt_iter *xpi,
+ struct rpc_xprt_switch *xps)
+{
+ __xprt_iter_init(xpi, xps, NULL);
+}
+
+/**
+ * xprt_iter_init_listall - Initialise an xprt iterator
+ * @xpi: pointer to rpc_xprt_iter
+ * @xps: pointer to rpc_xprt_switch
+ *
+ * Initialises the iterator to iterate once through the entire list
+ * of entries in xps.
+ */
+void xprt_iter_init_listall(struct rpc_xprt_iter *xpi,
+ struct rpc_xprt_switch *xps)
+{
+ __xprt_iter_init(xpi, xps, &rpc_xprt_iter_listall);
+}
+
+/**
+ * xprt_iter_xchg_switch - Atomically swap out the rpc_xprt_switch
+ * @xpi: pointer to rpc_xprt_iter
+ * @xps: pointer to a new rpc_xprt_switch or NULL
+ *
+ * Swaps out the existing xpi->xpi_xpswitch with a new value.
+ */
+struct rpc_xprt_switch *xprt_iter_xchg_switch(struct rpc_xprt_iter *xpi,
+ struct rpc_xprt_switch *newswitch)
+{
+ struct rpc_xprt_switch __rcu *oldswitch;
+
+ /* Atomically swap out the old xpswitch */
+ oldswitch = xchg(&xpi->xpi_xpswitch, RCU_INITIALIZER(newswitch));
+ if (newswitch != NULL)
+ xprt_iter_rewind(xpi);
+ return rcu_dereference_protected(oldswitch, true);
+}
+
+/**
+ * xprt_iter_destroy - Destroys the xprt iterator
+ * @xpi pointer to rpc_xprt_iter
+ */
+void xprt_iter_destroy(struct rpc_xprt_iter *xpi)
+{
+ xprt_switch_put(xprt_iter_xchg_switch(xpi, NULL));
+}
+
+/**
+ * xprt_iter_xprt - Returns the rpc_xprt pointed to by the cursor
+ * @xpi: pointer to rpc_xprt_iter
+ *
+ * Returns a pointer to the struct rpc_xprt that is currently
+ * pointed to by the cursor.
+ * Caller must be holding rcu_read_lock().
+ */
+struct rpc_xprt *xprt_iter_xprt(struct rpc_xprt_iter *xpi)
+{
+ WARN_ON_ONCE(!rcu_read_lock_held());
+ return xprt_iter_ops(xpi)->xpi_xprt(xpi);
+}
+
+static
+struct rpc_xprt *xprt_iter_get_helper(struct rpc_xprt_iter *xpi,
+ struct rpc_xprt *(*fn)(struct rpc_xprt_iter *))
+{
+ struct rpc_xprt *ret;
+
+ do {
+ ret = fn(xpi);
+ if (ret == NULL)
+ break;
+ ret = xprt_get(ret);
+ } while (ret == NULL);
+ return ret;
+}
+
+/**
+ * xprt_iter_get_xprt - Returns the rpc_xprt pointed to by the cursor
+ * @xpi: pointer to rpc_xprt_iter
+ *
+ * Returns a reference to the struct rpc_xprt that is currently
+ * pointed to by the cursor.
+ */
+struct rpc_xprt *xprt_iter_get_xprt(struct rpc_xprt_iter *xpi)
+{
+ struct rpc_xprt *xprt;
+
+ rcu_read_lock();
+ xprt = xprt_iter_get_helper(xpi, xprt_iter_ops(xpi)->xpi_xprt);
+ rcu_read_unlock();
+ return xprt;
+}
+
+/**
+ * xprt_iter_get_next - Returns the next rpc_xprt following the cursor
+ * @xpi: pointer to rpc_xprt_iter
+ *
+ * Returns a reference to the struct rpc_xprt that immediately follows the
+ * entry pointed to by the cursor.
+ */
+struct rpc_xprt *xprt_iter_get_next(struct rpc_xprt_iter *xpi)
+{
+ struct rpc_xprt *xprt;
+
+ rcu_read_lock();
+ xprt = xprt_iter_get_helper(xpi, xprt_iter_ops(xpi)->xpi_next);
+ rcu_read_unlock();
+ return xprt;
+}
+
+/* Policy for always returning the first entry in the rpc_xprt_switch */
+static
+const struct rpc_xprt_iter_ops rpc_xprt_iter_singular = {
+ .xpi_rewind = xprt_iter_no_rewind,
+ .xpi_xprt = xprt_iter_first_entry,
+ .xpi_next = xprt_iter_first_entry,
+};
+
+/* Policy for round-robin iteration of entries in the rpc_xprt_switch */
+static
+const struct rpc_xprt_iter_ops rpc_xprt_iter_roundrobin = {
+ .xpi_rewind = xprt_iter_default_rewind,
+ .xpi_xprt = xprt_iter_current_entry,
+ .xpi_next = xprt_iter_next_entry_roundrobin,
+};
+
+/* Policy for once-through iteration of entries in the rpc_xprt_switch */
+static
+const struct rpc_xprt_iter_ops rpc_xprt_iter_listall = {
+ .xpi_rewind = xprt_iter_default_rewind,
+ .xpi_xprt = xprt_iter_current_entry,
+ .xpi_next = xprt_iter_next_entry_all,
+};
diff --git a/net/sunrpc/xprtrdma/backchannel.c b/net/sunrpc/xprtrdma/backchannel.c
index 2dcd7640eeb5..87762d976b63 100644
--- a/net/sunrpc/xprtrdma/backchannel.c
+++ b/net/sunrpc/xprtrdma/backchannel.c
@@ -192,6 +192,22 @@ int xprt_rdma_bc_up(struct svc_serv *serv, struct net *net)
}
/**
+ * xprt_rdma_bc_maxpayload - Return maximum backchannel message size
+ * @xprt: transport
+ *
+ * Returns maximum size, in bytes, of a backchannel message
+ */
+size_t xprt_rdma_bc_maxpayload(struct rpc_xprt *xprt)
+{
+ struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
+ struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data;
+ size_t maxmsg;
+
+ maxmsg = min_t(unsigned int, cdata->inline_rsize, cdata->inline_wsize);
+ return maxmsg - RPCRDMA_HDRLEN_MIN;
+}
+
+/**
* rpcrdma_bc_marshal_reply - Send backwards direction reply
* @rqst: buffer containing RPC reply data
*
diff --git a/net/sunrpc/xprtrdma/fmr_ops.c b/net/sunrpc/xprtrdma/fmr_ops.c
index c14f3a4bff68..6326ebe8b595 100644
--- a/net/sunrpc/xprtrdma/fmr_ops.c
+++ b/net/sunrpc/xprtrdma/fmr_ops.c
@@ -35,10 +35,71 @@
/* Maximum scatter/gather per FMR */
#define RPCRDMA_MAX_FMR_SGES (64)
+static struct workqueue_struct *fmr_recovery_wq;
+
+#define FMR_RECOVERY_WQ_FLAGS (WQ_UNBOUND)
+
+int
+fmr_alloc_recovery_wq(void)
+{
+ fmr_recovery_wq = alloc_workqueue("fmr_recovery", WQ_UNBOUND, 0);
+ return !fmr_recovery_wq ? -ENOMEM : 0;
+}
+
+void
+fmr_destroy_recovery_wq(void)
+{
+ struct workqueue_struct *wq;
+
+ if (!fmr_recovery_wq)
+ return;
+
+ wq = fmr_recovery_wq;
+ fmr_recovery_wq = NULL;
+ destroy_workqueue(wq);
+}
+
+static int
+__fmr_unmap(struct rpcrdma_mw *mw)
+{
+ LIST_HEAD(l);
+
+ list_add(&mw->fmr.fmr->list, &l);
+ return ib_unmap_fmr(&l);
+}
+
+/* Deferred reset of a single FMR. Generate a fresh rkey by
+ * replacing the MR. There's no recovery if this fails.
+ */
+static void
+__fmr_recovery_worker(struct work_struct *work)
+{
+ struct rpcrdma_mw *mw = container_of(work, struct rpcrdma_mw,
+ mw_work);
+ struct rpcrdma_xprt *r_xprt = mw->mw_xprt;
+
+ __fmr_unmap(mw);
+ rpcrdma_put_mw(r_xprt, mw);
+ return;
+}
+
+/* A broken MR was discovered in a context that can't sleep.
+ * Defer recovery to the recovery worker.
+ */
+static void
+__fmr_queue_recovery(struct rpcrdma_mw *mw)
+{
+ INIT_WORK(&mw->mw_work, __fmr_recovery_worker);
+ queue_work(fmr_recovery_wq, &mw->mw_work);
+}
+
static int
fmr_op_open(struct rpcrdma_ia *ia, struct rpcrdma_ep *ep,
struct rpcrdma_create_data_internal *cdata)
{
+ rpcrdma_set_max_header_sizes(ia, cdata, max_t(unsigned int, 1,
+ RPCRDMA_MAX_DATA_SEGS /
+ RPCRDMA_MAX_FMR_SGES));
return 0;
}
@@ -48,7 +109,7 @@ static size_t
fmr_op_maxpages(struct rpcrdma_xprt *r_xprt)
{
return min_t(unsigned int, RPCRDMA_MAX_DATA_SEGS,
- rpcrdma_max_segments(r_xprt) * RPCRDMA_MAX_FMR_SGES);
+ RPCRDMA_MAX_HDR_SEGS * RPCRDMA_MAX_FMR_SGES);
}
static int
@@ -80,39 +141,31 @@ fmr_op_init(struct rpcrdma_xprt *r_xprt)
if (!r)
goto out;
- r->r.fmr.physaddrs = kmalloc(RPCRDMA_MAX_FMR_SGES *
- sizeof(u64), GFP_KERNEL);
- if (!r->r.fmr.physaddrs)
+ r->fmr.physaddrs = kmalloc(RPCRDMA_MAX_FMR_SGES *
+ sizeof(u64), GFP_KERNEL);
+ if (!r->fmr.physaddrs)
goto out_free;
- r->r.fmr.fmr = ib_alloc_fmr(pd, mr_access_flags, &fmr_attr);
- if (IS_ERR(r->r.fmr.fmr))
+ r->fmr.fmr = ib_alloc_fmr(pd, mr_access_flags, &fmr_attr);
+ if (IS_ERR(r->fmr.fmr))
goto out_fmr_err;
+ r->mw_xprt = r_xprt;
list_add(&r->mw_list, &buf->rb_mws);
list_add(&r->mw_all, &buf->rb_all);
}
return 0;
out_fmr_err:
- rc = PTR_ERR(r->r.fmr.fmr);
+ rc = PTR_ERR(r->fmr.fmr);
dprintk("RPC: %s: ib_alloc_fmr status %i\n", __func__, rc);
- kfree(r->r.fmr.physaddrs);
+ kfree(r->fmr.physaddrs);
out_free:
kfree(r);
out:
return rc;
}
-static int
-__fmr_unmap(struct rpcrdma_mw *r)
-{
- LIST_HEAD(l);
-
- list_add(&r->r.fmr.fmr->list, &l);
- return ib_unmap_fmr(&l);
-}
-
/* Use the ib_map_phys_fmr() verb to register a memory region
* for remote access via RDMA READ or RDMA WRITE.
*/
@@ -148,7 +201,7 @@ fmr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg,
nsegs = RPCRDMA_MAX_FMR_SGES;
for (i = 0; i < nsegs;) {
rpcrdma_map_one(device, seg, direction);
- mw->r.fmr.physaddrs[i] = seg->mr_dma;
+ mw->fmr.physaddrs[i] = seg->mr_dma;
len += seg->mr_len;
++seg;
++i;
@@ -158,13 +211,13 @@ fmr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg,
break;
}
- rc = ib_map_phys_fmr(mw->r.fmr.fmr, mw->r.fmr.physaddrs,
+ rc = ib_map_phys_fmr(mw->fmr.fmr, mw->fmr.physaddrs,
i, seg1->mr_dma);
if (rc)
goto out_maperr;
seg1->rl_mw = mw;
- seg1->mr_rkey = mw->r.fmr.fmr->rkey;
+ seg1->mr_rkey = mw->fmr.fmr->rkey;
seg1->mr_base = seg1->mr_dma + pageoff;
seg1->mr_nsegs = i;
seg1->mr_len = len;
@@ -183,15 +236,10 @@ static void
__fmr_dma_unmap(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg)
{
struct ib_device *device = r_xprt->rx_ia.ri_device;
- struct rpcrdma_mw *mw = seg->rl_mw;
int nsegs = seg->mr_nsegs;
- seg->rl_mw = NULL;
-
while (nsegs--)
rpcrdma_unmap_one(device, seg++);
-
- rpcrdma_put_mw(r_xprt, mw);
}
/* Invalidate all memory regions that were registered for "req".
@@ -219,7 +267,7 @@ fmr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
seg = &req->rl_segments[i];
mw = seg->rl_mw;
- list_add(&mw->r.fmr.fmr->list, &unmap_list);
+ list_add(&mw->fmr.fmr->list, &unmap_list);
i += seg->mr_nsegs;
}
@@ -234,42 +282,50 @@ fmr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
seg = &req->rl_segments[i];
__fmr_dma_unmap(r_xprt, seg);
+ rpcrdma_put_mw(r_xprt, seg->rl_mw);
i += seg->mr_nsegs;
seg->mr_nsegs = 0;
+ seg->rl_mw = NULL;
}
req->rl_nchunks = 0;
}
-/* Use the ib_unmap_fmr() verb to prevent further remote
- * access via RDMA READ or RDMA WRITE.
+/* Use a slow, safe mechanism to invalidate all memory regions
+ * that were registered for "req".
+ *
+ * In the asynchronous case, DMA unmapping occurs first here
+ * because the rpcrdma_mr_seg is released immediately after this
+ * call. It's contents won't be available in __fmr_dma_unmap later.
+ * FIXME.
*/
-static int
-fmr_op_unmap(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg)
+static void
+fmr_op_unmap_safe(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
+ bool sync)
{
- struct rpcrdma_ia *ia = &r_xprt->rx_ia;
- struct rpcrdma_mr_seg *seg1 = seg;
- struct rpcrdma_mw *mw = seg1->rl_mw;
- int rc, nsegs = seg->mr_nsegs;
+ struct rpcrdma_mr_seg *seg;
+ struct rpcrdma_mw *mw;
+ unsigned int i;
- dprintk("RPC: %s: FMR %p\n", __func__, mw);
+ for (i = 0; req->rl_nchunks; req->rl_nchunks--) {
+ seg = &req->rl_segments[i];
+ mw = seg->rl_mw;
- seg1->rl_mw = NULL;
- while (seg1->mr_nsegs--)
- rpcrdma_unmap_one(ia->ri_device, seg++);
- rc = __fmr_unmap(mw);
- if (rc)
- goto out_err;
- rpcrdma_put_mw(r_xprt, mw);
- return nsegs;
+ if (sync) {
+ /* ORDER */
+ __fmr_unmap(mw);
+ __fmr_dma_unmap(r_xprt, seg);
+ rpcrdma_put_mw(r_xprt, mw);
+ } else {
+ __fmr_dma_unmap(r_xprt, seg);
+ __fmr_queue_recovery(mw);
+ }
-out_err:
- /* The FMR is abandoned, but remains in rb_all. fmr_op_destroy
- * will attempt to release it when the transport is destroyed.
- */
- dprintk("RPC: %s: ib_unmap_fmr status %i\n", __func__, rc);
- return nsegs;
+ i += seg->mr_nsegs;
+ seg->mr_nsegs = 0;
+ seg->rl_mw = NULL;
+ }
}
static void
@@ -281,9 +337,9 @@ fmr_op_destroy(struct rpcrdma_buffer *buf)
while (!list_empty(&buf->rb_all)) {
r = list_entry(buf->rb_all.next, struct rpcrdma_mw, mw_all);
list_del(&r->mw_all);
- kfree(r->r.fmr.physaddrs);
+ kfree(r->fmr.physaddrs);
- rc = ib_dealloc_fmr(r->r.fmr.fmr);
+ rc = ib_dealloc_fmr(r->fmr.fmr);
if (rc)
dprintk("RPC: %s: ib_dealloc_fmr failed %i\n",
__func__, rc);
@@ -295,7 +351,7 @@ fmr_op_destroy(struct rpcrdma_buffer *buf)
const struct rpcrdma_memreg_ops rpcrdma_fmr_memreg_ops = {
.ro_map = fmr_op_map,
.ro_unmap_sync = fmr_op_unmap_sync,
- .ro_unmap = fmr_op_unmap,
+ .ro_unmap_safe = fmr_op_unmap_safe,
.ro_open = fmr_op_open,
.ro_maxpages = fmr_op_maxpages,
.ro_init = fmr_op_init,
diff --git a/net/sunrpc/xprtrdma/frwr_ops.c b/net/sunrpc/xprtrdma/frwr_ops.c
index e16567389e28..c0947544babe 100644
--- a/net/sunrpc/xprtrdma/frwr_ops.c
+++ b/net/sunrpc/xprtrdma/frwr_ops.c
@@ -98,6 +98,47 @@ frwr_destroy_recovery_wq(void)
destroy_workqueue(wq);
}
+static int
+__frwr_reset_mr(struct rpcrdma_ia *ia, struct rpcrdma_mw *r)
+{
+ struct rpcrdma_frmr *f = &r->frmr;
+ int rc;
+
+ rc = ib_dereg_mr(f->fr_mr);
+ if (rc) {
+ pr_warn("rpcrdma: ib_dereg_mr status %d, frwr %p orphaned\n",
+ rc, r);
+ return rc;
+ }
+
+ f->fr_mr = ib_alloc_mr(ia->ri_pd, IB_MR_TYPE_MEM_REG,
+ ia->ri_max_frmr_depth);
+ if (IS_ERR(f->fr_mr)) {
+ pr_warn("rpcrdma: ib_alloc_mr status %ld, frwr %p orphaned\n",
+ PTR_ERR(f->fr_mr), r);
+ return PTR_ERR(f->fr_mr);
+ }
+
+ dprintk("RPC: %s: recovered FRMR %p\n", __func__, r);
+ f->fr_state = FRMR_IS_INVALID;
+ return 0;
+}
+
+static void
+__frwr_reset_and_unmap(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mw *mw)
+{
+ struct rpcrdma_ia *ia = &r_xprt->rx_ia;
+ struct rpcrdma_frmr *f = &mw->frmr;
+ int rc;
+
+ rc = __frwr_reset_mr(ia, mw);
+ ib_dma_unmap_sg(ia->ri_device, f->fr_sg, f->fr_nents, f->fr_dir);
+ if (rc)
+ return;
+
+ rpcrdma_put_mw(r_xprt, mw);
+}
+
/* Deferred reset of a single FRMR. Generate a fresh rkey by
* replacing the MR.
*
@@ -109,26 +150,10 @@ static void
__frwr_recovery_worker(struct work_struct *work)
{
struct rpcrdma_mw *r = container_of(work, struct rpcrdma_mw,
- r.frmr.fr_work);
- struct rpcrdma_xprt *r_xprt = r->r.frmr.fr_xprt;
- unsigned int depth = r_xprt->rx_ia.ri_max_frmr_depth;
- struct ib_pd *pd = r_xprt->rx_ia.ri_pd;
-
- if (ib_dereg_mr(r->r.frmr.fr_mr))
- goto out_fail;
-
- r->r.frmr.fr_mr = ib_alloc_mr(pd, IB_MR_TYPE_MEM_REG, depth);
- if (IS_ERR(r->r.frmr.fr_mr))
- goto out_fail;
+ mw_work);
- dprintk("RPC: %s: recovered FRMR %p\n", __func__, r);
- r->r.frmr.fr_state = FRMR_IS_INVALID;
- rpcrdma_put_mw(r_xprt, r);
+ __frwr_reset_and_unmap(r->mw_xprt, r);
return;
-
-out_fail:
- pr_warn("RPC: %s: FRMR %p unrecovered\n",
- __func__, r);
}
/* A broken MR was discovered in a context that can't sleep.
@@ -137,26 +162,28 @@ out_fail:
static void
__frwr_queue_recovery(struct rpcrdma_mw *r)
{
- INIT_WORK(&r->r.frmr.fr_work, __frwr_recovery_worker);
- queue_work(frwr_recovery_wq, &r->r.frmr.fr_work);
+ INIT_WORK(&r->mw_work, __frwr_recovery_worker);
+ queue_work(frwr_recovery_wq, &r->mw_work);
}
static int
__frwr_init(struct rpcrdma_mw *r, struct ib_pd *pd, struct ib_device *device,
unsigned int depth)
{
- struct rpcrdma_frmr *f = &r->r.frmr;
+ struct rpcrdma_frmr *f = &r->frmr;
int rc;
f->fr_mr = ib_alloc_mr(pd, IB_MR_TYPE_MEM_REG, depth);
if (IS_ERR(f->fr_mr))
goto out_mr_err;
- f->sg = kcalloc(depth, sizeof(*f->sg), GFP_KERNEL);
- if (!f->sg)
+ f->fr_sg = kcalloc(depth, sizeof(*f->fr_sg), GFP_KERNEL);
+ if (!f->fr_sg)
goto out_list_err;
- sg_init_table(f->sg, depth);
+ sg_init_table(f->fr_sg, depth);
+
+ init_completion(&f->fr_linv_done);
return 0;
@@ -179,11 +206,11 @@ __frwr_release(struct rpcrdma_mw *r)
{
int rc;
- rc = ib_dereg_mr(r->r.frmr.fr_mr);
+ rc = ib_dereg_mr(r->frmr.fr_mr);
if (rc)
dprintk("RPC: %s: ib_dereg_mr status %i\n",
__func__, rc);
- kfree(r->r.frmr.sg);
+ kfree(r->frmr.fr_sg);
}
static int
@@ -229,6 +256,9 @@ frwr_op_open(struct rpcrdma_ia *ia, struct rpcrdma_ep *ep,
depth;
}
+ rpcrdma_set_max_header_sizes(ia, cdata, max_t(unsigned int, 1,
+ RPCRDMA_MAX_DATA_SEGS /
+ ia->ri_max_frmr_depth));
return 0;
}
@@ -241,42 +271,79 @@ frwr_op_maxpages(struct rpcrdma_xprt *r_xprt)
struct rpcrdma_ia *ia = &r_xprt->rx_ia;
return min_t(unsigned int, RPCRDMA_MAX_DATA_SEGS,
- rpcrdma_max_segments(r_xprt) * ia->ri_max_frmr_depth);
+ RPCRDMA_MAX_HDR_SEGS * ia->ri_max_frmr_depth);
+}
+
+static void
+__frwr_sendcompletion_flush(struct ib_wc *wc, struct rpcrdma_frmr *frmr,
+ const char *wr)
+{
+ frmr->fr_state = FRMR_IS_STALE;
+ if (wc->status != IB_WC_WR_FLUSH_ERR)
+ pr_err("rpcrdma: %s: %s (%u/0x%x)\n",
+ wr, ib_wc_status_msg(wc->status),
+ wc->status, wc->vendor_err);
}
-/* If FAST_REG or LOCAL_INV failed, indicate the frmr needs
- * to be reset.
+/**
+ * frwr_wc_fastreg - Invoked by RDMA provider for each polled FastReg WC
+ * @cq: completion queue (ignored)
+ * @wc: completed WR
*
- * WARNING: Only wr_id and status are reliable at this point
*/
static void
-__frwr_sendcompletion_flush(struct ib_wc *wc, struct rpcrdma_mw *r)
+frwr_wc_fastreg(struct ib_cq *cq, struct ib_wc *wc)
{
- if (likely(wc->status == IB_WC_SUCCESS))
- return;
-
- /* WARNING: Only wr_id and status are reliable at this point */
- r = (struct rpcrdma_mw *)(unsigned long)wc->wr_id;
- if (wc->status == IB_WC_WR_FLUSH_ERR)
- dprintk("RPC: %s: frmr %p flushed\n", __func__, r);
- else
- pr_warn("RPC: %s: frmr %p error, status %s (%d)\n",
- __func__, r, ib_wc_status_msg(wc->status), wc->status);
+ struct rpcrdma_frmr *frmr;
+ struct ib_cqe *cqe;
- r->r.frmr.fr_state = FRMR_IS_STALE;
+ /* WARNING: Only wr_cqe and status are reliable at this point */
+ if (wc->status != IB_WC_SUCCESS) {
+ cqe = wc->wr_cqe;
+ frmr = container_of(cqe, struct rpcrdma_frmr, fr_cqe);
+ __frwr_sendcompletion_flush(wc, frmr, "fastreg");
+ }
}
+/**
+ * frwr_wc_localinv - Invoked by RDMA provider for each polled LocalInv WC
+ * @cq: completion queue (ignored)
+ * @wc: completed WR
+ *
+ */
static void
-frwr_sendcompletion(struct ib_wc *wc)
+frwr_wc_localinv(struct ib_cq *cq, struct ib_wc *wc)
{
- struct rpcrdma_mw *r = (struct rpcrdma_mw *)(unsigned long)wc->wr_id;
- struct rpcrdma_frmr *f = &r->r.frmr;
+ struct rpcrdma_frmr *frmr;
+ struct ib_cqe *cqe;
- if (unlikely(wc->status != IB_WC_SUCCESS))
- __frwr_sendcompletion_flush(wc, r);
+ /* WARNING: Only wr_cqe and status are reliable at this point */
+ if (wc->status != IB_WC_SUCCESS) {
+ cqe = wc->wr_cqe;
+ frmr = container_of(cqe, struct rpcrdma_frmr, fr_cqe);
+ __frwr_sendcompletion_flush(wc, frmr, "localinv");
+ }
+}
- if (f->fr_waiter)
- complete(&f->fr_linv_done);
+/**
+ * frwr_wc_localinv - Invoked by RDMA provider for each polled LocalInv WC
+ * @cq: completion queue (ignored)
+ * @wc: completed WR
+ *
+ * Awaken anyone waiting for an MR to finish being fenced.
+ */
+static void
+frwr_wc_localinv_wake(struct ib_cq *cq, struct ib_wc *wc)
+{
+ struct rpcrdma_frmr *frmr;
+ struct ib_cqe *cqe;
+
+ /* WARNING: Only wr_cqe and status are reliable at this point */
+ cqe = wc->wr_cqe;
+ frmr = container_of(cqe, struct rpcrdma_frmr, fr_cqe);
+ if (wc->status != IB_WC_SUCCESS)
+ __frwr_sendcompletion_flush(wc, frmr, "localinv");
+ complete_all(&frmr->fr_linv_done);
}
static int
@@ -311,10 +378,9 @@ frwr_op_init(struct rpcrdma_xprt *r_xprt)
return rc;
}
+ r->mw_xprt = r_xprt;
list_add(&r->mw_list, &buf->rb_mws);
list_add(&r->mw_all, &buf->rb_all);
- r->mw_sendcompletion = frwr_sendcompletion;
- r->r.frmr.fr_xprt = r_xprt;
}
return 0;
@@ -347,10 +413,9 @@ frwr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg,
mw = rpcrdma_get_mw(r_xprt);
if (!mw)
return -ENOMEM;
- } while (mw->r.frmr.fr_state != FRMR_IS_INVALID);
- frmr = &mw->r.frmr;
+ } while (mw->frmr.fr_state != FRMR_IS_INVALID);
+ frmr = &mw->frmr;
frmr->fr_state = FRMR_IS_VALID;
- frmr->fr_waiter = false;
mr = frmr->fr_mr;
reg_wr = &frmr->fr_regwr;
@@ -359,12 +424,12 @@ frwr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg,
for (i = 0; i < nsegs;) {
if (seg->mr_page)
- sg_set_page(&frmr->sg[i],
+ sg_set_page(&frmr->fr_sg[i],
seg->mr_page,
seg->mr_len,
offset_in_page(seg->mr_offset));
else
- sg_set_buf(&frmr->sg[i], seg->mr_offset,
+ sg_set_buf(&frmr->fr_sg[i], seg->mr_offset,
seg->mr_len);
++seg;
@@ -375,32 +440,34 @@ frwr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg,
offset_in_page((seg-1)->mr_offset + (seg-1)->mr_len))
break;
}
- frmr->sg_nents = i;
+ frmr->fr_nents = i;
+ frmr->fr_dir = direction;
- dma_nents = ib_dma_map_sg(device, frmr->sg, frmr->sg_nents, direction);
+ dma_nents = ib_dma_map_sg(device, frmr->fr_sg, frmr->fr_nents, direction);
if (!dma_nents) {
pr_err("RPC: %s: failed to dma map sg %p sg_nents %u\n",
- __func__, frmr->sg, frmr->sg_nents);
+ __func__, frmr->fr_sg, frmr->fr_nents);
return -ENOMEM;
}
- n = ib_map_mr_sg(mr, frmr->sg, frmr->sg_nents, PAGE_SIZE);
- if (unlikely(n != frmr->sg_nents)) {
+ n = ib_map_mr_sg(mr, frmr->fr_sg, frmr->fr_nents, NULL, PAGE_SIZE);
+ if (unlikely(n != frmr->fr_nents)) {
pr_err("RPC: %s: failed to map mr %p (%u/%u)\n",
- __func__, frmr->fr_mr, n, frmr->sg_nents);
+ __func__, frmr->fr_mr, n, frmr->fr_nents);
rc = n < 0 ? n : -EINVAL;
goto out_senderr;
}
dprintk("RPC: %s: Using frmr %p to map %u segments (%u bytes)\n",
- __func__, mw, frmr->sg_nents, mr->length);
+ __func__, mw, frmr->fr_nents, mr->length);
key = (u8)(mr->rkey & 0x000000FF);
ib_update_fast_reg_key(mr, ++key);
reg_wr->wr.next = NULL;
reg_wr->wr.opcode = IB_WR_REG_MR;
- reg_wr->wr.wr_id = (uintptr_t)mw;
+ frmr->fr_cqe.done = frwr_wc_fastreg;
+ reg_wr->wr.wr_cqe = &frmr->fr_cqe;
reg_wr->wr.num_sge = 0;
reg_wr->wr.send_flags = 0;
reg_wr->mr = mr;
@@ -414,18 +481,16 @@ frwr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg,
if (rc)
goto out_senderr;
- seg1->mr_dir = direction;
seg1->rl_mw = mw;
seg1->mr_rkey = mr->rkey;
seg1->mr_base = mr->iova;
- seg1->mr_nsegs = frmr->sg_nents;
+ seg1->mr_nsegs = frmr->fr_nents;
seg1->mr_len = mr->length;
- return frmr->sg_nents;
+ return frmr->fr_nents;
out_senderr:
dprintk("RPC: %s: ib_post_send status %i\n", __func__, rc);
- ib_dma_unmap_sg(device, frmr->sg, dma_nents, direction);
__frwr_queue_recovery(mw);
return rc;
}
@@ -434,39 +499,21 @@ static struct ib_send_wr *
__frwr_prepare_linv_wr(struct rpcrdma_mr_seg *seg)
{
struct rpcrdma_mw *mw = seg->rl_mw;
- struct rpcrdma_frmr *f = &mw->r.frmr;
+ struct rpcrdma_frmr *f = &mw->frmr;
struct ib_send_wr *invalidate_wr;
- f->fr_waiter = false;
f->fr_state = FRMR_IS_INVALID;
invalidate_wr = &f->fr_invwr;
memset(invalidate_wr, 0, sizeof(*invalidate_wr));
- invalidate_wr->wr_id = (unsigned long)(void *)mw;
+ f->fr_cqe.done = frwr_wc_localinv;
+ invalidate_wr->wr_cqe = &f->fr_cqe;
invalidate_wr->opcode = IB_WR_LOCAL_INV;
invalidate_wr->ex.invalidate_rkey = f->fr_mr->rkey;
return invalidate_wr;
}
-static void
-__frwr_dma_unmap(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg,
- int rc)
-{
- struct ib_device *device = r_xprt->rx_ia.ri_device;
- struct rpcrdma_mw *mw = seg->rl_mw;
- struct rpcrdma_frmr *f = &mw->r.frmr;
-
- seg->rl_mw = NULL;
-
- ib_dma_unmap_sg(device, f->sg, f->sg_nents, seg->mr_dir);
-
- if (!rc)
- rpcrdma_put_mw(r_xprt, mw);
- else
- __frwr_queue_recovery(mw);
-}
-
/* Invalidate all memory regions that were registered for "req".
*
* Sleeps until it is safe for the host CPU to access the
@@ -480,6 +527,7 @@ frwr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
struct rpcrdma_mr_seg *seg;
unsigned int i, nchunks;
struct rpcrdma_frmr *f;
+ struct rpcrdma_mw *mw;
int rc;
dprintk("RPC: %s: req %p\n", __func__, req);
@@ -504,15 +552,15 @@ frwr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
i += seg->mr_nsegs;
}
- f = &seg->rl_mw->r.frmr;
+ f = &seg->rl_mw->frmr;
/* Strong send queue ordering guarantees that when the
* last WR in the chain completes, all WRs in the chain
* are complete.
*/
f->fr_invwr.send_flags = IB_SEND_SIGNALED;
- f->fr_waiter = true;
- init_completion(&f->fr_linv_done);
+ f->fr_cqe.done = frwr_wc_localinv_wake;
+ reinit_completion(&f->fr_linv_done);
INIT_CQCOUNT(&r_xprt->rx_ep);
/* Transport disconnect drains the receive CQ before it
@@ -521,64 +569,75 @@ frwr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
*/
rc = ib_post_send(ia->ri_id->qp, invalidate_wrs, &bad_wr);
if (rc)
- pr_warn("%s: ib_post_send failed %i\n", __func__, rc);
+ goto reset_mrs;
wait_for_completion(&f->fr_linv_done);
/* ORDER: Now DMA unmap all of the req's MRs, and return
* them to the free MW list.
*/
+unmap:
for (i = 0, nchunks = req->rl_nchunks; nchunks; nchunks--) {
seg = &req->rl_segments[i];
+ mw = seg->rl_mw;
+ seg->rl_mw = NULL;
- __frwr_dma_unmap(r_xprt, seg, rc);
+ ib_dma_unmap_sg(ia->ri_device, f->fr_sg, f->fr_nents,
+ f->fr_dir);
+ rpcrdma_put_mw(r_xprt, mw);
i += seg->mr_nsegs;
seg->mr_nsegs = 0;
}
req->rl_nchunks = 0;
-}
+ return;
-/* Post a LOCAL_INV Work Request to prevent further remote access
- * via RDMA READ or RDMA WRITE.
- */
-static int
-frwr_op_unmap(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg)
-{
- struct rpcrdma_mr_seg *seg1 = seg;
- struct rpcrdma_ia *ia = &r_xprt->rx_ia;
- struct rpcrdma_mw *mw = seg1->rl_mw;
- struct rpcrdma_frmr *frmr = &mw->r.frmr;
- struct ib_send_wr *invalidate_wr, *bad_wr;
- int rc, nsegs = seg->mr_nsegs;
+reset_mrs:
+ pr_warn("%s: ib_post_send failed %i\n", __func__, rc);
+
+ /* Find and reset the MRs in the LOCAL_INV WRs that did not
+ * get posted. This is synchronous, and slow.
+ */
+ for (i = 0, nchunks = req->rl_nchunks; nchunks; nchunks--) {
+ seg = &req->rl_segments[i];
+ mw = seg->rl_mw;
+ f = &mw->frmr;
- dprintk("RPC: %s: FRMR %p\n", __func__, mw);
+ if (mw->frmr.fr_mr->rkey == bad_wr->ex.invalidate_rkey) {
+ __frwr_reset_mr(ia, mw);
+ bad_wr = bad_wr->next;
+ }
- seg1->rl_mw = NULL;
- frmr->fr_state = FRMR_IS_INVALID;
- invalidate_wr = &mw->r.frmr.fr_invwr;
+ i += seg->mr_nsegs;
+ }
+ goto unmap;
+}
- memset(invalidate_wr, 0, sizeof(*invalidate_wr));
- invalidate_wr->wr_id = (uintptr_t)mw;
- invalidate_wr->opcode = IB_WR_LOCAL_INV;
- invalidate_wr->ex.invalidate_rkey = frmr->fr_mr->rkey;
- DECR_CQCOUNT(&r_xprt->rx_ep);
+/* Use a slow, safe mechanism to invalidate all memory regions
+ * that were registered for "req".
+ */
+static void
+frwr_op_unmap_safe(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
+ bool sync)
+{
+ struct rpcrdma_mr_seg *seg;
+ struct rpcrdma_mw *mw;
+ unsigned int i;
- ib_dma_unmap_sg(ia->ri_device, frmr->sg, frmr->sg_nents, seg1->mr_dir);
- read_lock(&ia->ri_qplock);
- rc = ib_post_send(ia->ri_id->qp, invalidate_wr, &bad_wr);
- read_unlock(&ia->ri_qplock);
- if (rc)
- goto out_err;
+ for (i = 0; req->rl_nchunks; req->rl_nchunks--) {
+ seg = &req->rl_segments[i];
+ mw = seg->rl_mw;
- rpcrdma_put_mw(r_xprt, mw);
- return nsegs;
+ if (sync)
+ __frwr_reset_and_unmap(r_xprt, mw);
+ else
+ __frwr_queue_recovery(mw);
-out_err:
- dprintk("RPC: %s: ib_post_send status %i\n", __func__, rc);
- __frwr_queue_recovery(mw);
- return nsegs;
+ i += seg->mr_nsegs;
+ seg->mr_nsegs = 0;
+ seg->rl_mw = NULL;
+ }
}
static void
@@ -600,7 +659,7 @@ frwr_op_destroy(struct rpcrdma_buffer *buf)
const struct rpcrdma_memreg_ops rpcrdma_frwr_memreg_ops = {
.ro_map = frwr_op_map,
.ro_unmap_sync = frwr_op_unmap_sync,
- .ro_unmap = frwr_op_unmap,
+ .ro_unmap_safe = frwr_op_unmap_safe,
.ro_open = frwr_op_open,
.ro_maxpages = frwr_op_maxpages,
.ro_init = frwr_op_init,
diff --git a/net/sunrpc/xprtrdma/physical_ops.c b/net/sunrpc/xprtrdma/physical_ops.c
index dbb302ecf590..3750596cc432 100644
--- a/net/sunrpc/xprtrdma/physical_ops.c
+++ b/net/sunrpc/xprtrdma/physical_ops.c
@@ -36,8 +36,11 @@ physical_op_open(struct rpcrdma_ia *ia, struct rpcrdma_ep *ep,
__func__, PTR_ERR(mr));
return -ENOMEM;
}
-
ia->ri_dma_mr = mr;
+
+ rpcrdma_set_max_header_sizes(ia, cdata, min_t(unsigned int,
+ RPCRDMA_MAX_DATA_SEGS,
+ RPCRDMA_MAX_HDR_SEGS));
return 0;
}
@@ -47,7 +50,7 @@ static size_t
physical_op_maxpages(struct rpcrdma_xprt *r_xprt)
{
return min_t(unsigned int, RPCRDMA_MAX_DATA_SEGS,
- rpcrdma_max_segments(r_xprt));
+ RPCRDMA_MAX_HDR_SEGS);
}
static int
@@ -68,18 +71,6 @@ physical_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg,
rpcrdma_map_one(ia->ri_device, seg, rpcrdma_data_dir(writing));
seg->mr_rkey = ia->ri_dma_mr->rkey;
seg->mr_base = seg->mr_dma;
- seg->mr_nsegs = 1;
- return 1;
-}
-
-/* Unmap a memory region, but leave it registered.
- */
-static int
-physical_op_unmap(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg)
-{
- struct rpcrdma_ia *ia = &r_xprt->rx_ia;
-
- rpcrdma_unmap_one(ia->ri_device, seg);
return 1;
}
@@ -95,6 +86,25 @@ physical_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
rpcrdma_unmap_one(device, &req->rl_segments[i++]);
}
+/* Use a slow, safe mechanism to invalidate all memory regions
+ * that were registered for "req".
+ *
+ * For physical memory registration, there is no good way to
+ * fence a single MR that has been advertised to the server. The
+ * client has already handed the server an R_key that cannot be
+ * invalidated and is shared by all MRs on this connection.
+ * Tearing down the PD might be the only safe choice, but it's
+ * not clear that a freshly acquired DMA R_key would be different
+ * than the one used by the PD that was just destroyed.
+ * FIXME.
+ */
+static void
+physical_op_unmap_safe(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
+ bool sync)
+{
+ physical_op_unmap_sync(r_xprt, req);
+}
+
static void
physical_op_destroy(struct rpcrdma_buffer *buf)
{
@@ -103,7 +113,7 @@ physical_op_destroy(struct rpcrdma_buffer *buf)
const struct rpcrdma_memreg_ops rpcrdma_physical_memreg_ops = {
.ro_map = physical_op_map,
.ro_unmap_sync = physical_op_unmap_sync,
- .ro_unmap = physical_op_unmap,
+ .ro_unmap_safe = physical_op_unmap_safe,
.ro_open = physical_op_open,
.ro_maxpages = physical_op_maxpages,
.ro_init = physical_op_init,
diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c
index 0f28f2d743ed..35a81096e83d 100644
--- a/net/sunrpc/xprtrdma/rpc_rdma.c
+++ b/net/sunrpc/xprtrdma/rpc_rdma.c
@@ -61,26 +61,84 @@ enum rpcrdma_chunktype {
rpcrdma_replych
};
-#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
static const char transfertypes[][12] = {
- "pure inline", /* no chunks */
- " read chunk", /* some argument via rdma read */
- "*read chunk", /* entire request via rdma read */
- "write chunk", /* some result via rdma write */
+ "inline", /* no chunks */
+ "read list", /* some argument via rdma read */
+ "*read list", /* entire request via rdma read */
+ "write list", /* some result via rdma write */
"reply chunk" /* entire reply via rdma write */
};
-#endif
+
+/* Returns size of largest RPC-over-RDMA header in a Call message
+ *
+ * The largest Call header contains a full-size Read list and a
+ * minimal Reply chunk.
+ */
+static unsigned int rpcrdma_max_call_header_size(unsigned int maxsegs)
+{
+ unsigned int size;
+
+ /* Fixed header fields and list discriminators */
+ size = RPCRDMA_HDRLEN_MIN;
+
+ /* Maximum Read list size */
+ maxsegs += 2; /* segment for head and tail buffers */
+ size = maxsegs * sizeof(struct rpcrdma_read_chunk);
+
+ /* Minimal Read chunk size */
+ size += sizeof(__be32); /* segment count */
+ size += sizeof(struct rpcrdma_segment);
+ size += sizeof(__be32); /* list discriminator */
+
+ dprintk("RPC: %s: max call header size = %u\n",
+ __func__, size);
+ return size;
+}
+
+/* Returns size of largest RPC-over-RDMA header in a Reply message
+ *
+ * There is only one Write list or one Reply chunk per Reply
+ * message. The larger list is the Write list.
+ */
+static unsigned int rpcrdma_max_reply_header_size(unsigned int maxsegs)
+{
+ unsigned int size;
+
+ /* Fixed header fields and list discriminators */
+ size = RPCRDMA_HDRLEN_MIN;
+
+ /* Maximum Write list size */
+ maxsegs += 2; /* segment for head and tail buffers */
+ size = sizeof(__be32); /* segment count */
+ size += maxsegs * sizeof(struct rpcrdma_segment);
+ size += sizeof(__be32); /* list discriminator */
+
+ dprintk("RPC: %s: max reply header size = %u\n",
+ __func__, size);
+ return size;
+}
+
+void rpcrdma_set_max_header_sizes(struct rpcrdma_ia *ia,
+ struct rpcrdma_create_data_internal *cdata,
+ unsigned int maxsegs)
+{
+ ia->ri_max_inline_write = cdata->inline_wsize -
+ rpcrdma_max_call_header_size(maxsegs);
+ ia->ri_max_inline_read = cdata->inline_rsize -
+ rpcrdma_max_reply_header_size(maxsegs);
+}
/* The client can send a request inline as long as the RPCRDMA header
* plus the RPC call fit under the transport's inline limit. If the
* combined call message size exceeds that limit, the client must use
* the read chunk list for this operation.
*/
-static bool rpcrdma_args_inline(struct rpc_rqst *rqst)
+static bool rpcrdma_args_inline(struct rpcrdma_xprt *r_xprt,
+ struct rpc_rqst *rqst)
{
- unsigned int callsize = RPCRDMA_HDRLEN_MIN + rqst->rq_snd_buf.len;
+ struct rpcrdma_ia *ia = &r_xprt->rx_ia;
- return callsize <= RPCRDMA_INLINE_WRITE_THRESHOLD(rqst);
+ return rqst->rq_snd_buf.len <= ia->ri_max_inline_write;
}
/* The client can't know how large the actual reply will be. Thus it
@@ -89,11 +147,12 @@ static bool rpcrdma_args_inline(struct rpc_rqst *rqst)
* limit, the client must provide a write list or a reply chunk for
* this request.
*/
-static bool rpcrdma_results_inline(struct rpc_rqst *rqst)
+static bool rpcrdma_results_inline(struct rpcrdma_xprt *r_xprt,
+ struct rpc_rqst *rqst)
{
- unsigned int repsize = RPCRDMA_HDRLEN_MIN + rqst->rq_rcv_buf.buflen;
+ struct rpcrdma_ia *ia = &r_xprt->rx_ia;
- return repsize <= RPCRDMA_INLINE_READ_THRESHOLD(rqst);
+ return rqst->rq_rcv_buf.buflen <= ia->ri_max_inline_read;
}
static int
@@ -132,6 +191,33 @@ rpcrdma_tail_pullup(struct xdr_buf *buf)
return tlen;
}
+/* Split "vec" on page boundaries into segments. FMR registers pages,
+ * not a byte range. Other modes coalesce these segments into a single
+ * MR when they can.
+ */
+static int
+rpcrdma_convert_kvec(struct kvec *vec, struct rpcrdma_mr_seg *seg,
+ int n, int nsegs)
+{
+ size_t page_offset;
+ u32 remaining;
+ char *base;
+
+ base = vec->iov_base;
+ page_offset = offset_in_page(base);
+ remaining = vec->iov_len;
+ while (remaining && n < nsegs) {
+ seg[n].mr_page = NULL;
+ seg[n].mr_offset = base;
+ seg[n].mr_len = min_t(u32, PAGE_SIZE - page_offset, remaining);
+ remaining -= seg[n].mr_len;
+ base += seg[n].mr_len;
+ ++n;
+ page_offset = 0;
+ }
+ return n;
+}
+
/*
* Chunk assembly from upper layer xdr_buf.
*
@@ -150,11 +236,10 @@ rpcrdma_convert_iovs(struct xdr_buf *xdrbuf, unsigned int pos,
int page_base;
struct page **ppages;
- if (pos == 0 && xdrbuf->head[0].iov_len) {
- seg[n].mr_page = NULL;
- seg[n].mr_offset = xdrbuf->head[0].iov_base;
- seg[n].mr_len = xdrbuf->head[0].iov_len;
- ++n;
+ if (pos == 0) {
+ n = rpcrdma_convert_kvec(&xdrbuf->head[0], seg, n, nsegs);
+ if (n == nsegs)
+ return -EIO;
}
len = xdrbuf->page_len;
@@ -192,35 +277,24 @@ rpcrdma_convert_iovs(struct xdr_buf *xdrbuf, unsigned int pos,
* xdr pad bytes, saving the server an RDMA operation. */
if (xdrbuf->tail[0].iov_len < 4 && xprt_rdma_pad_optimize)
return n;
+ n = rpcrdma_convert_kvec(&xdrbuf->tail[0], seg, n, nsegs);
if (n == nsegs)
- /* Tail remains, but we're out of segments */
return -EIO;
- seg[n].mr_page = NULL;
- seg[n].mr_offset = xdrbuf->tail[0].iov_base;
- seg[n].mr_len = xdrbuf->tail[0].iov_len;
- ++n;
}
return n;
}
-/*
- * Create read/write chunk lists, and reply chunks, for RDMA
- *
- * Assume check against THRESHOLD has been done, and chunks are required.
- * Assume only encoding one list entry for read|write chunks. The NFSv3
- * protocol is simple enough to allow this as it only has a single "bulk
- * result" in each procedure - complicated NFSv4 COMPOUNDs are not. (The
- * RDMA/Sessions NFSv4 proposal addresses this for future v4 revs.)
- *
- * When used for a single reply chunk (which is a special write
- * chunk used for the entire reply, rather than just the data), it
- * is used primarily for READDIR and READLINK which would otherwise
- * be severely size-limited by a small rdma inline read max. The server
- * response will come back as an RDMA Write, followed by a message
- * of type RDMA_NOMSG carrying the xid and length. As a result, reply
- * chunks do not provide data alignment, however they do not require
- * "fixup" (moving the response to the upper layer buffer) either.
+static inline __be32 *
+xdr_encode_rdma_segment(__be32 *iptr, struct rpcrdma_mr_seg *seg)
+{
+ *iptr++ = cpu_to_be32(seg->mr_rkey);
+ *iptr++ = cpu_to_be32(seg->mr_len);
+ return xdr_encode_hyper(iptr, seg->mr_base);
+}
+
+/* XDR-encode the Read list. Supports encoding a list of read
+ * segments that belong to a single read chunk.
*
* Encoding key for single-list chunks (HLOO = Handle32 Length32 Offset64):
*
@@ -228,131 +302,190 @@ rpcrdma_convert_iovs(struct xdr_buf *xdrbuf, unsigned int pos,
* N elements, position P (same P for all chunks of same arg!):
* 1 - PHLOO - 1 - PHLOO - ... - 1 - PHLOO - 0
*
+ * Returns a pointer to the XDR word in the RDMA header following
+ * the end of the Read list, or an error pointer.
+ */
+static __be32 *
+rpcrdma_encode_read_list(struct rpcrdma_xprt *r_xprt,
+ struct rpcrdma_req *req, struct rpc_rqst *rqst,
+ __be32 *iptr, enum rpcrdma_chunktype rtype)
+{
+ struct rpcrdma_mr_seg *seg = req->rl_nextseg;
+ unsigned int pos;
+ int n, nsegs;
+
+ if (rtype == rpcrdma_noch) {
+ *iptr++ = xdr_zero; /* item not present */
+ return iptr;
+ }
+
+ pos = rqst->rq_snd_buf.head[0].iov_len;
+ if (rtype == rpcrdma_areadch)
+ pos = 0;
+ nsegs = rpcrdma_convert_iovs(&rqst->rq_snd_buf, pos, rtype, seg,
+ RPCRDMA_MAX_SEGS - req->rl_nchunks);
+ if (nsegs < 0)
+ return ERR_PTR(nsegs);
+
+ do {
+ n = r_xprt->rx_ia.ri_ops->ro_map(r_xprt, seg, nsegs, false);
+ if (n <= 0)
+ return ERR_PTR(n);
+
+ *iptr++ = xdr_one; /* item present */
+
+ /* All read segments in this chunk
+ * have the same "position".
+ */
+ *iptr++ = cpu_to_be32(pos);
+ iptr = xdr_encode_rdma_segment(iptr, seg);
+
+ dprintk("RPC: %5u %s: read segment pos %u "
+ "%d@0x%016llx:0x%08x (%s)\n",
+ rqst->rq_task->tk_pid, __func__, pos,
+ seg->mr_len, (unsigned long long)seg->mr_base,
+ seg->mr_rkey, n < nsegs ? "more" : "last");
+
+ r_xprt->rx_stats.read_chunk_count++;
+ req->rl_nchunks++;
+ seg += n;
+ nsegs -= n;
+ } while (nsegs);
+ req->rl_nextseg = seg;
+
+ /* Finish Read list */
+ *iptr++ = xdr_zero; /* Next item not present */
+ return iptr;
+}
+
+/* XDR-encode the Write list. Supports encoding a list containing
+ * one array of plain segments that belong to a single write chunk.
+ *
+ * Encoding key for single-list chunks (HLOO = Handle32 Length32 Offset64):
+ *
* Write chunklist (a list of (one) counted array):
* N elements:
* 1 - N - HLOO - HLOO - ... - HLOO - 0
*
+ * Returns a pointer to the XDR word in the RDMA header following
+ * the end of the Write list, or an error pointer.
+ */
+static __be32 *
+rpcrdma_encode_write_list(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
+ struct rpc_rqst *rqst, __be32 *iptr,
+ enum rpcrdma_chunktype wtype)
+{
+ struct rpcrdma_mr_seg *seg = req->rl_nextseg;
+ int n, nsegs, nchunks;
+ __be32 *segcount;
+
+ if (wtype != rpcrdma_writech) {
+ *iptr++ = xdr_zero; /* no Write list present */
+ return iptr;
+ }
+
+ nsegs = rpcrdma_convert_iovs(&rqst->rq_rcv_buf,
+ rqst->rq_rcv_buf.head[0].iov_len,
+ wtype, seg,
+ RPCRDMA_MAX_SEGS - req->rl_nchunks);
+ if (nsegs < 0)
+ return ERR_PTR(nsegs);
+
+ *iptr++ = xdr_one; /* Write list present */
+ segcount = iptr++; /* save location of segment count */
+
+ nchunks = 0;
+ do {
+ n = r_xprt->rx_ia.ri_ops->ro_map(r_xprt, seg, nsegs, true);
+ if (n <= 0)
+ return ERR_PTR(n);
+
+ iptr = xdr_encode_rdma_segment(iptr, seg);
+
+ dprintk("RPC: %5u %s: write segment "
+ "%d@0x016%llx:0x%08x (%s)\n",
+ rqst->rq_task->tk_pid, __func__,
+ seg->mr_len, (unsigned long long)seg->mr_base,
+ seg->mr_rkey, n < nsegs ? "more" : "last");
+
+ r_xprt->rx_stats.write_chunk_count++;
+ r_xprt->rx_stats.total_rdma_request += seg->mr_len;
+ req->rl_nchunks++;
+ nchunks++;
+ seg += n;
+ nsegs -= n;
+ } while (nsegs);
+ req->rl_nextseg = seg;
+
+ /* Update count of segments in this Write chunk */
+ *segcount = cpu_to_be32(nchunks);
+
+ /* Finish Write list */
+ *iptr++ = xdr_zero; /* Next item not present */
+ return iptr;
+}
+
+/* XDR-encode the Reply chunk. Supports encoding an array of plain
+ * segments that belong to a single write (reply) chunk.
+ *
+ * Encoding key for single-list chunks (HLOO = Handle32 Length32 Offset64):
+ *
* Reply chunk (a counted array):
* N elements:
* 1 - N - HLOO - HLOO - ... - HLOO
*
- * Returns positive RPC/RDMA header size, or negative errno.
+ * Returns a pointer to the XDR word in the RDMA header following
+ * the end of the Reply chunk, or an error pointer.
*/
-
-static ssize_t
-rpcrdma_create_chunks(struct rpc_rqst *rqst, struct xdr_buf *target,
- struct rpcrdma_msg *headerp, enum rpcrdma_chunktype type)
+static __be32 *
+rpcrdma_encode_reply_chunk(struct rpcrdma_xprt *r_xprt,
+ struct rpcrdma_req *req, struct rpc_rqst *rqst,
+ __be32 *iptr, enum rpcrdma_chunktype wtype)
{
- struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
- struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(rqst->rq_xprt);
- int n, nsegs, nchunks = 0;
- unsigned int pos;
- struct rpcrdma_mr_seg *seg = req->rl_segments;
- struct rpcrdma_read_chunk *cur_rchunk = NULL;
- struct rpcrdma_write_array *warray = NULL;
- struct rpcrdma_write_chunk *cur_wchunk = NULL;
- __be32 *iptr = headerp->rm_body.rm_chunks;
- int (*map)(struct rpcrdma_xprt *, struct rpcrdma_mr_seg *, int, bool);
-
- if (type == rpcrdma_readch || type == rpcrdma_areadch) {
- /* a read chunk - server will RDMA Read our memory */
- cur_rchunk = (struct rpcrdma_read_chunk *) iptr;
- } else {
- /* a write or reply chunk - server will RDMA Write our memory */
- *iptr++ = xdr_zero; /* encode a NULL read chunk list */
- if (type == rpcrdma_replych)
- *iptr++ = xdr_zero; /* a NULL write chunk list */
- warray = (struct rpcrdma_write_array *) iptr;
- cur_wchunk = (struct rpcrdma_write_chunk *) (warray + 1);
- }
+ struct rpcrdma_mr_seg *seg = req->rl_nextseg;
+ int n, nsegs, nchunks;
+ __be32 *segcount;
- if (type == rpcrdma_replych || type == rpcrdma_areadch)
- pos = 0;
- else
- pos = target->head[0].iov_len;
+ if (wtype != rpcrdma_replych) {
+ *iptr++ = xdr_zero; /* no Reply chunk present */
+ return iptr;
+ }
- nsegs = rpcrdma_convert_iovs(target, pos, type, seg, RPCRDMA_MAX_SEGS);
+ nsegs = rpcrdma_convert_iovs(&rqst->rq_rcv_buf, 0, wtype, seg,
+ RPCRDMA_MAX_SEGS - req->rl_nchunks);
if (nsegs < 0)
- return nsegs;
+ return ERR_PTR(nsegs);
+
+ *iptr++ = xdr_one; /* Reply chunk present */
+ segcount = iptr++; /* save location of segment count */
- map = r_xprt->rx_ia.ri_ops->ro_map;
+ nchunks = 0;
do {
- n = map(r_xprt, seg, nsegs, cur_wchunk != NULL);
+ n = r_xprt->rx_ia.ri_ops->ro_map(r_xprt, seg, nsegs, true);
if (n <= 0)
- goto out;
- if (cur_rchunk) { /* read */
- cur_rchunk->rc_discrim = xdr_one;
- /* all read chunks have the same "position" */
- cur_rchunk->rc_position = cpu_to_be32(pos);
- cur_rchunk->rc_target.rs_handle =
- cpu_to_be32(seg->mr_rkey);
- cur_rchunk->rc_target.rs_length =
- cpu_to_be32(seg->mr_len);
- xdr_encode_hyper(
- (__be32 *)&cur_rchunk->rc_target.rs_offset,
- seg->mr_base);
- dprintk("RPC: %s: read chunk "
- "elem %d@0x%llx:0x%x pos %u (%s)\n", __func__,
- seg->mr_len, (unsigned long long)seg->mr_base,
- seg->mr_rkey, pos, n < nsegs ? "more" : "last");
- cur_rchunk++;
- r_xprt->rx_stats.read_chunk_count++;
- } else { /* write/reply */
- cur_wchunk->wc_target.rs_handle =
- cpu_to_be32(seg->mr_rkey);
- cur_wchunk->wc_target.rs_length =
- cpu_to_be32(seg->mr_len);
- xdr_encode_hyper(
- (__be32 *)&cur_wchunk->wc_target.rs_offset,
- seg->mr_base);
- dprintk("RPC: %s: %s chunk "
- "elem %d@0x%llx:0x%x (%s)\n", __func__,
- (type == rpcrdma_replych) ? "reply" : "write",
- seg->mr_len, (unsigned long long)seg->mr_base,
- seg->mr_rkey, n < nsegs ? "more" : "last");
- cur_wchunk++;
- if (type == rpcrdma_replych)
- r_xprt->rx_stats.reply_chunk_count++;
- else
- r_xprt->rx_stats.write_chunk_count++;
- r_xprt->rx_stats.total_rdma_request += seg->mr_len;
- }
+ return ERR_PTR(n);
+
+ iptr = xdr_encode_rdma_segment(iptr, seg);
+
+ dprintk("RPC: %5u %s: reply segment "
+ "%d@0x%016llx:0x%08x (%s)\n",
+ rqst->rq_task->tk_pid, __func__,
+ seg->mr_len, (unsigned long long)seg->mr_base,
+ seg->mr_rkey, n < nsegs ? "more" : "last");
+
+ r_xprt->rx_stats.reply_chunk_count++;
+ r_xprt->rx_stats.total_rdma_request += seg->mr_len;
+ req->rl_nchunks++;
nchunks++;
seg += n;
nsegs -= n;
} while (nsegs);
+ req->rl_nextseg = seg;
- /* success. all failures return above */
- req->rl_nchunks = nchunks;
-
- /*
- * finish off header. If write, marshal discrim and nchunks.
- */
- if (cur_rchunk) {
- iptr = (__be32 *) cur_rchunk;
- *iptr++ = xdr_zero; /* finish the read chunk list */
- *iptr++ = xdr_zero; /* encode a NULL write chunk list */
- *iptr++ = xdr_zero; /* encode a NULL reply chunk */
- } else {
- warray->wc_discrim = xdr_one;
- warray->wc_nchunks = cpu_to_be32(nchunks);
- iptr = (__be32 *) cur_wchunk;
- if (type == rpcrdma_writech) {
- *iptr++ = xdr_zero; /* finish the write chunk list */
- *iptr++ = xdr_zero; /* encode a NULL reply chunk */
- }
- }
-
- /*
- * Return header size.
- */
- return (unsigned char *)iptr - (unsigned char *)headerp;
+ /* Update count of segments in the Reply chunk */
+ *segcount = cpu_to_be32(nchunks);
-out:
- for (pos = 0; nchunks--;)
- pos += r_xprt->rx_ia.ri_ops->ro_unmap(r_xprt,
- &req->rl_segments[pos]);
- return n;
+ return iptr;
}
/*
@@ -418,13 +551,10 @@ static void rpcrdma_inline_pullup(struct rpc_rqst *rqst)
* Marshal a request: the primary job of this routine is to choose
* the transfer modes. See comments below.
*
- * Uses multiple RDMA IOVs for a request:
- * [0] -- RPC RDMA header, which uses memory from the *start* of the
- * preregistered buffer that already holds the RPC data in
- * its middle.
- * [1] -- the RPC header/data, marshaled by RPC and the NFS protocol.
- * [2] -- optional padding.
- * [3] -- if padded, header only in [1] and data here.
+ * Prepares up to two IOVs per Call message:
+ *
+ * [0] -- RPC RDMA header
+ * [1] -- the RPC header/data
*
* Returns zero on success, otherwise a negative errno.
*/
@@ -435,24 +565,17 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst)
struct rpc_xprt *xprt = rqst->rq_xprt;
struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
- char *base;
- size_t rpclen;
- ssize_t hdrlen;
enum rpcrdma_chunktype rtype, wtype;
struct rpcrdma_msg *headerp;
+ ssize_t hdrlen;
+ size_t rpclen;
+ __be32 *iptr;
#if defined(CONFIG_SUNRPC_BACKCHANNEL)
if (test_bit(RPC_BC_PA_IN_USE, &rqst->rq_bc_pa_state))
return rpcrdma_bc_marshal_reply(rqst);
#endif
- /*
- * rpclen gets amount of data in first buffer, which is the
- * pre-registered buffer.
- */
- base = rqst->rq_svec[0].iov_base;
- rpclen = rqst->rq_svec[0].iov_len;
-
headerp = rdmab_to_msg(req->rl_rdmabuf);
/* don't byte-swap XID, it's already done in request */
headerp->rm_xid = rqst->rq_xid;
@@ -463,15 +586,16 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst)
/*
* Chunks needed for results?
*
- * o Read ops return data as write chunk(s), header as inline.
* o If the expected result is under the inline threshold, all ops
* return as inline.
+ * o Large read ops return data as write chunk(s), header as
+ * inline.
* o Large non-read ops return as a single reply chunk.
*/
- if (rqst->rq_rcv_buf.flags & XDRBUF_READ)
- wtype = rpcrdma_writech;
- else if (rpcrdma_results_inline(rqst))
+ if (rpcrdma_results_inline(r_xprt, rqst))
wtype = rpcrdma_noch;
+ else if (rqst->rq_rcv_buf.flags & XDRBUF_READ)
+ wtype = rpcrdma_writech;
else
wtype = rpcrdma_replych;
@@ -489,10 +613,14 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst)
* that both has a data payload, and whose non-data arguments
* by themselves are larger than the inline threshold.
*/
- if (rpcrdma_args_inline(rqst)) {
+ if (rpcrdma_args_inline(r_xprt, rqst)) {
rtype = rpcrdma_noch;
+ rpcrdma_inline_pullup(rqst);
+ rpclen = rqst->rq_svec[0].iov_len;
} else if (rqst->rq_snd_buf.flags & XDRBUF_WRITE) {
rtype = rpcrdma_readch;
+ rpclen = rqst->rq_svec[0].iov_len;
+ rpclen += rpcrdma_tail_pullup(&rqst->rq_snd_buf);
} else {
r_xprt->rx_stats.nomsg_call_count++;
headerp->rm_type = htonl(RDMA_NOMSG);
@@ -500,57 +628,50 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst)
rpclen = 0;
}
- /* The following simplification is not true forever */
- if (rtype != rpcrdma_noch && wtype == rpcrdma_replych)
- wtype = rpcrdma_noch;
- if (rtype != rpcrdma_noch && wtype != rpcrdma_noch) {
- dprintk("RPC: %s: cannot marshal multiple chunk lists\n",
- __func__);
- return -EIO;
- }
-
- hdrlen = RPCRDMA_HDRLEN_MIN;
-
- /*
- * Pull up any extra send data into the preregistered buffer.
- * When padding is in use and applies to the transfer, insert
- * it and change the message type.
+ /* This implementation supports the following combinations
+ * of chunk lists in one RPC-over-RDMA Call message:
+ *
+ * - Read list
+ * - Write list
+ * - Reply chunk
+ * - Read list + Reply chunk
+ *
+ * It might not yet support the following combinations:
+ *
+ * - Read list + Write list
+ *
+ * It does not support the following combinations:
+ *
+ * - Write list + Reply chunk
+ * - Read list + Write list + Reply chunk
+ *
+ * This implementation supports only a single chunk in each
+ * Read or Write list. Thus for example the client cannot
+ * send a Call message with a Position Zero Read chunk and a
+ * regular Read chunk at the same time.
*/
- if (rtype == rpcrdma_noch) {
-
- rpcrdma_inline_pullup(rqst);
+ req->rl_nchunks = 0;
+ req->rl_nextseg = req->rl_segments;
+ iptr = headerp->rm_body.rm_chunks;
+ iptr = rpcrdma_encode_read_list(r_xprt, req, rqst, iptr, rtype);
+ if (IS_ERR(iptr))
+ goto out_unmap;
+ iptr = rpcrdma_encode_write_list(r_xprt, req, rqst, iptr, wtype);
+ if (IS_ERR(iptr))
+ goto out_unmap;
+ iptr = rpcrdma_encode_reply_chunk(r_xprt, req, rqst, iptr, wtype);
+ if (IS_ERR(iptr))
+ goto out_unmap;
+ hdrlen = (unsigned char *)iptr - (unsigned char *)headerp;
+
+ if (hdrlen + rpclen > RPCRDMA_INLINE_WRITE_THRESHOLD(rqst))
+ goto out_overflow;
+
+ dprintk("RPC: %5u %s: %s/%s: hdrlen %zd rpclen %zd\n",
+ rqst->rq_task->tk_pid, __func__,
+ transfertypes[rtype], transfertypes[wtype],
+ hdrlen, rpclen);
- headerp->rm_body.rm_nochunks.rm_empty[0] = xdr_zero;
- headerp->rm_body.rm_nochunks.rm_empty[1] = xdr_zero;
- headerp->rm_body.rm_nochunks.rm_empty[2] = xdr_zero;
- /* new length after pullup */
- rpclen = rqst->rq_svec[0].iov_len;
- } else if (rtype == rpcrdma_readch)
- rpclen += rpcrdma_tail_pullup(&rqst->rq_snd_buf);
- if (rtype != rpcrdma_noch) {
- hdrlen = rpcrdma_create_chunks(rqst, &rqst->rq_snd_buf,
- headerp, rtype);
- wtype = rtype; /* simplify dprintk */
-
- } else if (wtype != rpcrdma_noch) {
- hdrlen = rpcrdma_create_chunks(rqst, &rqst->rq_rcv_buf,
- headerp, wtype);
- }
- if (hdrlen < 0)
- return hdrlen;
-
- dprintk("RPC: %s: %s: hdrlen %zd rpclen %zd"
- " headerp 0x%p base 0x%p lkey 0x%x\n",
- __func__, transfertypes[wtype], hdrlen, rpclen,
- headerp, base, rdmab_lkey(req->rl_rdmabuf));
-
- /*
- * initialize send_iov's - normally only two: rdma chunk header and
- * single preregistered RPC header buffer, but if padding is present,
- * then use a preregistered (and zeroed) pad buffer between the RPC
- * header and any write data. In all non-rdma cases, any following
- * data has been copied into the RPC header buffer.
- */
req->rl_send_iov[0].addr = rdmab_addr(req->rl_rdmabuf);
req->rl_send_iov[0].length = hdrlen;
req->rl_send_iov[0].lkey = rdmab_lkey(req->rl_rdmabuf);
@@ -565,6 +686,18 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst)
req->rl_niovs = 2;
return 0;
+
+out_overflow:
+ pr_err("rpcrdma: send overflow: hdrlen %zd rpclen %zu %s/%s\n",
+ hdrlen, rpclen, transfertypes[rtype], transfertypes[wtype]);
+ /* Terminate this RPC. Chunks registered above will be
+ * released by xprt_release -> xprt_rmda_free .
+ */
+ return -EIO;
+
+out_unmap:
+ r_xprt->rx_ia.ri_ops->ro_unmap_safe(r_xprt, req, false);
+ return PTR_ERR(iptr);
}
/*
@@ -773,20 +906,17 @@ rpcrdma_reply_handler(struct rpcrdma_rep *rep)
struct rpcrdma_xprt *r_xprt = rep->rr_rxprt;
struct rpc_xprt *xprt = &r_xprt->rx_xprt;
__be32 *iptr;
- int rdmalen, status;
+ int rdmalen, status, rmerr;
unsigned long cwnd;
- u32 credits;
dprintk("RPC: %s: incoming rep %p\n", __func__, rep);
if (rep->rr_len == RPCRDMA_BAD_LEN)
goto out_badstatus;
- if (rep->rr_len < RPCRDMA_HDRLEN_MIN)
+ if (rep->rr_len < RPCRDMA_HDRLEN_ERR)
goto out_shortreply;
headerp = rdmab_to_msg(rep->rr_rdmabuf);
- if (headerp->rm_vers != rpcrdma_version)
- goto out_badversion;
#if defined(CONFIG_SUNRPC_BACKCHANNEL)
if (rpcrdma_is_bcall(headerp))
goto out_bcall;
@@ -809,15 +939,16 @@ rpcrdma_reply_handler(struct rpcrdma_rep *rep)
*/
list_del_init(&rqst->rq_list);
spin_unlock_bh(&xprt->transport_lock);
- dprintk("RPC: %s: reply 0x%p completes request 0x%p\n"
- " RPC request 0x%p xid 0x%08x\n",
- __func__, rep, req, rqst,
- be32_to_cpu(headerp->rm_xid));
+ dprintk("RPC: %s: reply %p completes request %p (xid 0x%08x)\n",
+ __func__, rep, req, be32_to_cpu(headerp->rm_xid));
/* from here on, the reply is no longer an orphan */
req->rl_reply = rep;
xprt->reestablish_timeout = 0;
+ if (headerp->rm_vers != rpcrdma_version)
+ goto out_badversion;
+
/* check for expected message types */
/* The order of some of these tests is important. */
switch (headerp->rm_type) {
@@ -878,6 +1009,9 @@ rpcrdma_reply_handler(struct rpcrdma_rep *rep)
status = rdmalen;
break;
+ case rdma_error:
+ goto out_rdmaerr;
+
badheader:
default:
dprintk("%s: invalid rpcrdma reply header (type %d):"
@@ -893,6 +1027,7 @@ badheader:
break;
}
+out:
/* Invalidate and flush the data payloads before waking the
* waiting application. This guarantees the memory region is
* properly fenced from the server before the application
@@ -903,15 +1038,9 @@ badheader:
if (req->rl_nchunks)
r_xprt->rx_ia.ri_ops->ro_unmap_sync(r_xprt, req);
- credits = be32_to_cpu(headerp->rm_credit);
- if (credits == 0)
- credits = 1; /* don't deadlock */
- else if (credits > r_xprt->rx_buf.rb_max_requests)
- credits = r_xprt->rx_buf.rb_max_requests;
-
spin_lock_bh(&xprt->transport_lock);
cwnd = xprt->cwnd;
- xprt->cwnd = credits << RPC_CWNDSHIFT;
+ xprt->cwnd = atomic_read(&r_xprt->rx_buf.rb_credits) << RPC_CWNDSHIFT;
if (xprt->cwnd > cwnd)
xprt_release_rqst_cong(rqst->rq_task);
@@ -935,13 +1064,43 @@ out_bcall:
return;
#endif
-out_shortreply:
- dprintk("RPC: %s: short/invalid reply\n", __func__);
- goto repost;
-
+/* If the incoming reply terminated a pending RPC, the next
+ * RPC call will post a replacement receive buffer as it is
+ * being marshaled.
+ */
out_badversion:
dprintk("RPC: %s: invalid version %d\n",
__func__, be32_to_cpu(headerp->rm_vers));
+ status = -EIO;
+ r_xprt->rx_stats.bad_reply_count++;
+ goto out;
+
+out_rdmaerr:
+ rmerr = be32_to_cpu(headerp->rm_body.rm_error.rm_err);
+ switch (rmerr) {
+ case ERR_VERS:
+ pr_err("%s: server reports header version error (%u-%u)\n",
+ __func__,
+ be32_to_cpu(headerp->rm_body.rm_error.rm_vers_low),
+ be32_to_cpu(headerp->rm_body.rm_error.rm_vers_high));
+ break;
+ case ERR_CHUNK:
+ pr_err("%s: server reports header decoding error\n",
+ __func__);
+ break;
+ default:
+ pr_err("%s: server reports unknown error %d\n",
+ __func__, rmerr);
+ }
+ status = -EREMOTEIO;
+ r_xprt->rx_stats.bad_reply_count++;
+ goto out;
+
+/* If no pending RPC transaction was matched, post a replacement
+ * receive buffer before returning.
+ */
+out_shortreply:
+ dprintk("RPC: %s: short/invalid reply\n", __func__);
goto repost;
out_nomatch:
diff --git a/net/sunrpc/xprtrdma/svc_rdma_backchannel.c b/net/sunrpc/xprtrdma/svc_rdma_backchannel.c
index 65a7c232a345..a2a7519b0f23 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_backchannel.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_backchannel.c
@@ -107,26 +107,18 @@ static int svc_rdma_bc_sendto(struct svcxprt_rdma *rdma,
int ret;
vec = svc_rdma_get_req_map(rdma);
- ret = svc_rdma_map_xdr(rdma, sndbuf, vec);
+ ret = svc_rdma_map_xdr(rdma, sndbuf, vec, false);
if (ret)
goto out_err;
- /* Post a recv buffer to handle the reply for this request. */
- ret = svc_rdma_post_recv(rdma, GFP_NOIO);
- if (ret) {
- pr_err("svcrdma: Failed to post bc receive buffer, err=%d.\n",
- ret);
- pr_err("svcrdma: closing transport %p.\n", rdma);
- set_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags);
- ret = -ENOTCONN;
+ ret = svc_rdma_repost_recv(rdma, GFP_NOIO);
+ if (ret)
goto out_err;
- }
ctxt = svc_rdma_get_context(rdma);
ctxt->pages[0] = virt_to_page(rqst->rq_buffer);
ctxt->count = 1;
- ctxt->wr_op = IB_WR_SEND;
ctxt->direction = DMA_TO_DEVICE;
ctxt->sge[0].lkey = rdma->sc_pd->local_dma_lkey;
ctxt->sge[0].length = sndbuf->len;
@@ -140,7 +132,8 @@ static int svc_rdma_bc_sendto(struct svcxprt_rdma *rdma,
atomic_inc(&rdma->sc_dma_used);
memset(&send_wr, 0, sizeof(send_wr));
- send_wr.wr_id = (unsigned long)ctxt;
+ ctxt->cqe.done = svc_rdma_wc_send;
+ send_wr.wr_cqe = &ctxt->cqe;
send_wr.sg_list = ctxt->sge;
send_wr.num_sge = 1;
send_wr.opcode = IB_WR_SEND;
diff --git a/net/sunrpc/xprtrdma/svc_rdma_marshal.c b/net/sunrpc/xprtrdma/svc_rdma_marshal.c
index e2fca7617242..0ba9887f3e22 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_marshal.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_marshal.c
@@ -145,63 +145,99 @@ static __be32 *decode_reply_array(__be32 *va, __be32 *vaend)
return (__be32 *)&ary->wc_array[nchunks];
}
-int svc_rdma_xdr_decode_req(struct rpcrdma_msg **rdma_req,
- struct svc_rqst *rqstp)
+/**
+ * svc_rdma_xdr_decode_req - Parse incoming RPC-over-RDMA header
+ * @rq_arg: Receive buffer
+ *
+ * On entry, xdr->head[0].iov_base points to first byte in the
+ * RPC-over-RDMA header.
+ *
+ * On successful exit, head[0] points to first byte past the
+ * RPC-over-RDMA header. For RDMA_MSG, this is the RPC message.
+ * The length of the RPC-over-RDMA header is returned.
+ */
+int svc_rdma_xdr_decode_req(struct xdr_buf *rq_arg)
{
- struct rpcrdma_msg *rmsgp = NULL;
+ struct rpcrdma_msg *rmsgp;
__be32 *va, *vaend;
+ unsigned int len;
u32 hdr_len;
- rmsgp = (struct rpcrdma_msg *)rqstp->rq_arg.head[0].iov_base;
-
/* Verify that there's enough bytes for header + something */
- if (rqstp->rq_arg.len <= RPCRDMA_HDRLEN_MIN) {
+ if (rq_arg->len <= RPCRDMA_HDRLEN_ERR) {
dprintk("svcrdma: header too short = %d\n",
- rqstp->rq_arg.len);
+ rq_arg->len);
return -EINVAL;
}
- if (rmsgp->rm_vers != rpcrdma_version)
- return -ENOSYS;
-
- /* Pull in the extra for the padded case and bump our pointer */
- if (rmsgp->rm_type == rdma_msgp) {
- int hdrlen;
+ rmsgp = (struct rpcrdma_msg *)rq_arg->head[0].iov_base;
+ if (rmsgp->rm_vers != rpcrdma_version) {
+ dprintk("%s: bad version %u\n", __func__,
+ be32_to_cpu(rmsgp->rm_vers));
+ return -EPROTONOSUPPORT;
+ }
+ switch (be32_to_cpu(rmsgp->rm_type)) {
+ case RDMA_MSG:
+ case RDMA_NOMSG:
+ break;
+
+ case RDMA_DONE:
+ /* Just drop it */
+ dprintk("svcrdma: dropping RDMA_DONE message\n");
+ return 0;
+
+ case RDMA_ERROR:
+ /* Possible if this is a backchannel reply.
+ * XXX: We should cancel this XID, though.
+ */
+ dprintk("svcrdma: dropping RDMA_ERROR message\n");
+ return 0;
+
+ case RDMA_MSGP:
+ /* Pull in the extra for the padded case, bump our pointer */
rmsgp->rm_body.rm_padded.rm_align =
be32_to_cpu(rmsgp->rm_body.rm_padded.rm_align);
rmsgp->rm_body.rm_padded.rm_thresh =
be32_to_cpu(rmsgp->rm_body.rm_padded.rm_thresh);
va = &rmsgp->rm_body.rm_padded.rm_pempty[4];
- rqstp->rq_arg.head[0].iov_base = va;
- hdrlen = (u32)((unsigned long)va - (unsigned long)rmsgp);
- rqstp->rq_arg.head[0].iov_len -= hdrlen;
- if (hdrlen > rqstp->rq_arg.len)
+ rq_arg->head[0].iov_base = va;
+ len = (u32)((unsigned long)va - (unsigned long)rmsgp);
+ rq_arg->head[0].iov_len -= len;
+ if (len > rq_arg->len)
return -EINVAL;
- return hdrlen;
+ return len;
+ default:
+ dprintk("svcrdma: bad rdma procedure (%u)\n",
+ be32_to_cpu(rmsgp->rm_type));
+ return -EINVAL;
}
/* The chunk list may contain either a read chunk list or a write
* chunk list and a reply chunk list.
*/
va = &rmsgp->rm_body.rm_chunks[0];
- vaend = (__be32 *)((unsigned long)rmsgp + rqstp->rq_arg.len);
+ vaend = (__be32 *)((unsigned long)rmsgp + rq_arg->len);
va = decode_read_list(va, vaend);
- if (!va)
+ if (!va) {
+ dprintk("svcrdma: failed to decode read list\n");
return -EINVAL;
+ }
va = decode_write_list(va, vaend);
- if (!va)
+ if (!va) {
+ dprintk("svcrdma: failed to decode write list\n");
return -EINVAL;
+ }
va = decode_reply_array(va, vaend);
- if (!va)
+ if (!va) {
+ dprintk("svcrdma: failed to decode reply chunk\n");
return -EINVAL;
+ }
- rqstp->rq_arg.head[0].iov_base = va;
+ rq_arg->head[0].iov_base = va;
hdr_len = (unsigned long)va - (unsigned long)rmsgp;
- rqstp->rq_arg.head[0].iov_len -= hdr_len;
-
- *rdma_req = rmsgp;
+ rq_arg->head[0].iov_len -= hdr_len;
return hdr_len;
}
diff --git a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
index c8b8a8b4181e..2c25606f2561 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
@@ -180,9 +180,9 @@ int rdma_read_chunk_lcl(struct svcxprt_rdma *xprt,
clear_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags);
memset(&read_wr, 0, sizeof(read_wr));
- read_wr.wr.wr_id = (unsigned long)ctxt;
+ ctxt->cqe.done = svc_rdma_wc_read;
+ read_wr.wr.wr_cqe = &ctxt->cqe;
read_wr.wr.opcode = IB_WR_RDMA_READ;
- ctxt->wr_op = read_wr.wr.opcode;
read_wr.wr.send_flags = IB_SEND_SIGNALED;
read_wr.rkey = rs_handle;
read_wr.remote_addr = rs_offset;
@@ -281,7 +281,7 @@ int rdma_read_chunk_frmr(struct svcxprt_rdma *xprt,
}
atomic_inc(&xprt->sc_dma_used);
- n = ib_map_mr_sg(frmr->mr, frmr->sg, frmr->sg_nents, PAGE_SIZE);
+ n = ib_map_mr_sg(frmr->mr, frmr->sg, frmr->sg_nents, NULL, PAGE_SIZE);
if (unlikely(n != frmr->sg_nents)) {
pr_err("svcrdma: failed to map mr %p (%d/%d elements)\n",
frmr->mr, n, frmr->sg_nents);
@@ -299,8 +299,9 @@ int rdma_read_chunk_frmr(struct svcxprt_rdma *xprt,
ctxt->read_hdr = head;
/* Prepare REG WR */
+ ctxt->reg_cqe.done = svc_rdma_wc_reg;
+ reg_wr.wr.wr_cqe = &ctxt->reg_cqe;
reg_wr.wr.opcode = IB_WR_REG_MR;
- reg_wr.wr.wr_id = 0;
reg_wr.wr.send_flags = IB_SEND_SIGNALED;
reg_wr.wr.num_sge = 0;
reg_wr.mr = frmr->mr;
@@ -310,6 +311,8 @@ int rdma_read_chunk_frmr(struct svcxprt_rdma *xprt,
/* Prepare RDMA_READ */
memset(&read_wr, 0, sizeof(read_wr));
+ ctxt->cqe.done = svc_rdma_wc_read;
+ read_wr.wr.wr_cqe = &ctxt->cqe;
read_wr.wr.send_flags = IB_SEND_SIGNALED;
read_wr.rkey = rs_handle;
read_wr.remote_addr = rs_offset;
@@ -317,19 +320,18 @@ int rdma_read_chunk_frmr(struct svcxprt_rdma *xprt,
read_wr.wr.num_sge = 1;
if (xprt->sc_dev_caps & SVCRDMA_DEVCAP_READ_W_INV) {
read_wr.wr.opcode = IB_WR_RDMA_READ_WITH_INV;
- read_wr.wr.wr_id = (unsigned long)ctxt;
read_wr.wr.ex.invalidate_rkey = ctxt->frmr->mr->lkey;
} else {
read_wr.wr.opcode = IB_WR_RDMA_READ;
read_wr.wr.next = &inv_wr;
/* Prepare invalidate */
memset(&inv_wr, 0, sizeof(inv_wr));
- inv_wr.wr_id = (unsigned long)ctxt;
+ ctxt->inv_cqe.done = svc_rdma_wc_inv;
+ inv_wr.wr_cqe = &ctxt->inv_cqe;
inv_wr.opcode = IB_WR_LOCAL_INV;
inv_wr.send_flags = IB_SEND_SIGNALED | IB_SEND_FENCE;
inv_wr.ex.invalidate_rkey = frmr->mr->lkey;
}
- ctxt->wr_op = read_wr.wr.opcode;
/* Post the chain */
ret = svc_rdma_send(xprt, &reg_wr.wr);
@@ -445,10 +447,8 @@ static int rdma_read_chunks(struct svcxprt_rdma *xprt,
head->arg.len = rqstp->rq_arg.len;
head->arg.buflen = rqstp->rq_arg.buflen;
- ch = (struct rpcrdma_read_chunk *)&rmsgp->rm_body.rm_chunks[0];
- position = be32_to_cpu(ch->rc_position);
-
/* RDMA_NOMSG: RDMA READ data should land just after RDMA RECV data */
+ position = be32_to_cpu(ch->rc_position);
if (position == 0) {
head->arg.pages = &head->pages[0];
page_offset = head->byte_len;
@@ -486,7 +486,7 @@ static int rdma_read_chunks(struct svcxprt_rdma *xprt,
if (page_offset & 3) {
u32 pad = 4 - (page_offset & 3);
- head->arg.page_len += pad;
+ head->arg.tail[0].iov_len += pad;
head->arg.len += pad;
head->arg.buflen += pad;
page_offset += pad;
@@ -508,11 +508,10 @@ static int rdma_read_chunks(struct svcxprt_rdma *xprt,
return ret;
}
-static int rdma_read_complete(struct svc_rqst *rqstp,
- struct svc_rdma_op_ctxt *head)
+static void rdma_read_complete(struct svc_rqst *rqstp,
+ struct svc_rdma_op_ctxt *head)
{
int page_no;
- int ret;
/* Copy RPC pages */
for (page_no = 0; page_no < head->count; page_no++) {
@@ -548,23 +547,6 @@ static int rdma_read_complete(struct svc_rqst *rqstp,
rqstp->rq_arg.tail[0] = head->arg.tail[0];
rqstp->rq_arg.len = head->arg.len;
rqstp->rq_arg.buflen = head->arg.buflen;
-
- /* Free the context */
- svc_rdma_put_context(head, 0);
-
- /* XXX: What should this be? */
- rqstp->rq_prot = IPPROTO_MAX;
- svc_xprt_copy_addrs(rqstp, rqstp->rq_xprt);
-
- ret = rqstp->rq_arg.head[0].iov_len
- + rqstp->rq_arg.page_len
- + rqstp->rq_arg.tail[0].iov_len;
- dprintk("svcrdma: deferred read ret=%d, rq_arg.len=%u, "
- "rq_arg.head[0].iov_base=%p, rq_arg.head[0].iov_len=%zu\n",
- ret, rqstp->rq_arg.len, rqstp->rq_arg.head[0].iov_base,
- rqstp->rq_arg.head[0].iov_len);
-
- return ret;
}
/* By convention, backchannel calls arrive via rdma_msg type
@@ -612,7 +594,6 @@ int svc_rdma_recvfrom(struct svc_rqst *rqstp)
struct svc_rdma_op_ctxt *ctxt = NULL;
struct rpcrdma_msg *rmsgp;
int ret = 0;
- int len;
dprintk("svcrdma: rqstp=%p\n", rqstp);
@@ -623,7 +604,8 @@ int svc_rdma_recvfrom(struct svc_rqst *rqstp)
dto_q);
list_del_init(&ctxt->dto_q);
spin_unlock_bh(&rdma_xprt->sc_rq_dto_lock);
- return rdma_read_complete(rqstp, ctxt);
+ rdma_read_complete(rqstp, ctxt);
+ goto complete;
} else if (!list_empty(&rdma_xprt->sc_rq_dto_q)) {
ctxt = list_entry(rdma_xprt->sc_rq_dto_q.next,
struct svc_rdma_op_ctxt,
@@ -642,8 +624,7 @@ int svc_rdma_recvfrom(struct svc_rqst *rqstp)
* transport list
*/
if (test_bit(XPT_CLOSE, &xprt->xpt_flags))
- goto close_out;
-
+ goto defer;
goto out;
}
dprintk("svcrdma: processing ctxt=%p on xprt=%p, rqstp=%p, status=%d\n",
@@ -654,15 +635,13 @@ int svc_rdma_recvfrom(struct svc_rqst *rqstp)
rdma_build_arg_xdr(rqstp, ctxt, ctxt->byte_len);
/* Decode the RDMA header. */
- len = svc_rdma_xdr_decode_req(&rmsgp, rqstp);
- rqstp->rq_xprt_hlen = len;
-
- /* If the request is invalid, reply with an error */
- if (len < 0) {
- if (len == -ENOSYS)
- svc_rdma_send_error(rdma_xprt, rmsgp, ERR_VERS);
- goto close_out;
- }
+ rmsgp = (struct rpcrdma_msg *)rqstp->rq_arg.head[0].iov_base;
+ ret = svc_rdma_xdr_decode_req(&rqstp->rq_arg);
+ if (ret < 0)
+ goto out_err;
+ if (ret == 0)
+ goto out_drop;
+ rqstp->rq_xprt_hlen = ret;
if (svc_rdma_is_backchannel_reply(xprt, rmsgp)) {
ret = svc_rdma_handle_bc_reply(xprt->xpt_bc_xprt, rmsgp,
@@ -684,6 +663,7 @@ int svc_rdma_recvfrom(struct svc_rqst *rqstp)
return 0;
}
+complete:
ret = rqstp->rq_arg.head[0].iov_len
+ rqstp->rq_arg.page_len
+ rqstp->rq_arg.tail[0].iov_len;
@@ -698,26 +678,16 @@ int svc_rdma_recvfrom(struct svc_rqst *rqstp)
svc_xprt_copy_addrs(rqstp, xprt);
return ret;
- close_out:
- if (ctxt)
- svc_rdma_put_context(ctxt, 1);
- dprintk("svcrdma: transport %p is closing\n", xprt);
- /*
- * Set the close bit and enqueue it. svc_recv will see the
- * close bit and call svc_xprt_delete
- */
- set_bit(XPT_CLOSE, &xprt->xpt_flags);
+out_err:
+ svc_rdma_send_error(rdma_xprt, rmsgp, ret);
+ svc_rdma_put_context(ctxt, 0);
+ return 0;
+
defer:
return 0;
+out_drop:
+ svc_rdma_put_context(ctxt, 1);
repost:
- ret = svc_rdma_post_recv(rdma_xprt, GFP_KERNEL);
- if (ret) {
- pr_err("svcrdma: could not post a receive buffer, err=%d.\n",
- ret);
- pr_err("svcrdma: closing transport %p.\n", rdma_xprt);
- set_bit(XPT_CLOSE, &rdma_xprt->sc_xprt.xpt_flags);
- ret = -ENOTCONN;
- }
- return ret;
+ return svc_rdma_repost_recv(rdma_xprt, GFP_KERNEL);
}
diff --git a/net/sunrpc/xprtrdma/svc_rdma_sendto.c b/net/sunrpc/xprtrdma/svc_rdma_sendto.c
index df57f3ce6cd2..54d533300620 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_sendto.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_sendto.c
@@ -50,9 +50,15 @@
#define RPCDBG_FACILITY RPCDBG_SVCXPRT
+static u32 xdr_padsize(u32 len)
+{
+ return (len & 3) ? (4 - (len & 3)) : 0;
+}
+
int svc_rdma_map_xdr(struct svcxprt_rdma *xprt,
struct xdr_buf *xdr,
- struct svc_rdma_req_map *vec)
+ struct svc_rdma_req_map *vec,
+ bool write_chunk_present)
{
int sge_no;
u32 sge_bytes;
@@ -92,9 +98,20 @@ int svc_rdma_map_xdr(struct svcxprt_rdma *xprt,
/* Tail SGE */
if (xdr->tail[0].iov_len) {
- vec->sge[sge_no].iov_base = xdr->tail[0].iov_base;
- vec->sge[sge_no].iov_len = xdr->tail[0].iov_len;
- sge_no++;
+ unsigned char *base = xdr->tail[0].iov_base;
+ size_t len = xdr->tail[0].iov_len;
+ u32 xdr_pad = xdr_padsize(xdr->page_len);
+
+ if (write_chunk_present && xdr_pad) {
+ base += xdr_pad;
+ len -= xdr_pad;
+ }
+
+ if (len) {
+ vec->sge[sge_no].iov_base = base;
+ vec->sge[sge_no].iov_len = len;
+ sge_no++;
+ }
}
dprintk("svcrdma: %s: sge_no %d page_no %d "
@@ -166,10 +183,10 @@ svc_rdma_get_write_array(struct rpcrdma_msg *rmsgp)
* reply array is present
*/
static struct rpcrdma_write_array *
-svc_rdma_get_reply_array(struct rpcrdma_msg *rmsgp)
+svc_rdma_get_reply_array(struct rpcrdma_msg *rmsgp,
+ struct rpcrdma_write_array *wr_ary)
{
struct rpcrdma_read_chunk *rch;
- struct rpcrdma_write_array *wr_ary;
struct rpcrdma_write_array *rp_ary;
/* XXX: Need to fix when reply chunk may occur with read list
@@ -191,7 +208,6 @@ svc_rdma_get_reply_array(struct rpcrdma_msg *rmsgp)
goto found_it;
}
- wr_ary = svc_rdma_get_write_array(rmsgp);
if (wr_ary) {
int chunk = be32_to_cpu(wr_ary->wc_nchunks);
@@ -281,8 +297,8 @@ static int send_write(struct svcxprt_rdma *xprt, struct svc_rqst *rqstp,
/* Prepare WRITE WR */
memset(&write_wr, 0, sizeof write_wr);
- ctxt->wr_op = IB_WR_RDMA_WRITE;
- write_wr.wr.wr_id = (unsigned long)ctxt;
+ ctxt->cqe.done = svc_rdma_wc_write;
+ write_wr.wr.wr_cqe = &ctxt->cqe;
write_wr.wr.sg_list = &sge[0];
write_wr.wr.num_sge = sge_no;
write_wr.wr.opcode = IB_WR_RDMA_WRITE;
@@ -298,41 +314,37 @@ static int send_write(struct svcxprt_rdma *xprt, struct svc_rqst *rqstp,
err:
svc_rdma_unmap_dma(ctxt);
svc_rdma_put_context(ctxt, 0);
- /* Fatal error, close transport */
return -EIO;
}
+noinline
static int send_write_chunks(struct svcxprt_rdma *xprt,
- struct rpcrdma_msg *rdma_argp,
+ struct rpcrdma_write_array *wr_ary,
struct rpcrdma_msg *rdma_resp,
struct svc_rqst *rqstp,
struct svc_rdma_req_map *vec)
{
- u32 xfer_len = rqstp->rq_res.page_len + rqstp->rq_res.tail[0].iov_len;
+ u32 xfer_len = rqstp->rq_res.page_len;
int write_len;
u32 xdr_off;
int chunk_off;
int chunk_no;
int nchunks;
- struct rpcrdma_write_array *arg_ary;
struct rpcrdma_write_array *res_ary;
int ret;
- arg_ary = svc_rdma_get_write_array(rdma_argp);
- if (!arg_ary)
- return 0;
res_ary = (struct rpcrdma_write_array *)
&rdma_resp->rm_body.rm_chunks[1];
/* Write chunks start at the pagelist */
- nchunks = be32_to_cpu(arg_ary->wc_nchunks);
+ nchunks = be32_to_cpu(wr_ary->wc_nchunks);
for (xdr_off = rqstp->rq_res.head[0].iov_len, chunk_no = 0;
xfer_len && chunk_no < nchunks;
chunk_no++) {
struct rpcrdma_segment *arg_ch;
u64 rs_offset;
- arg_ch = &arg_ary->wc_array[chunk_no].wc_target;
+ arg_ch = &wr_ary->wc_array[chunk_no].wc_target;
write_len = min(xfer_len, be32_to_cpu(arg_ch->rs_length));
/* Prepare the response chunk given the length actually
@@ -350,11 +362,8 @@ static int send_write_chunks(struct svcxprt_rdma *xprt,
xdr_off,
write_len,
vec);
- if (ret <= 0) {
- dprintk("svcrdma: RDMA_WRITE failed, ret=%d\n",
- ret);
- return -EIO;
- }
+ if (ret <= 0)
+ goto out_err;
chunk_off += ret;
xdr_off += ret;
xfer_len -= ret;
@@ -364,11 +373,16 @@ static int send_write_chunks(struct svcxprt_rdma *xprt,
/* Update the req with the number of chunks actually used */
svc_rdma_xdr_encode_write_list(rdma_resp, chunk_no);
- return rqstp->rq_res.page_len + rqstp->rq_res.tail[0].iov_len;
+ return rqstp->rq_res.page_len;
+
+out_err:
+ pr_err("svcrdma: failed to send write chunks, rc=%d\n", ret);
+ return -EIO;
}
+noinline
static int send_reply_chunks(struct svcxprt_rdma *xprt,
- struct rpcrdma_msg *rdma_argp,
+ struct rpcrdma_write_array *rp_ary,
struct rpcrdma_msg *rdma_resp,
struct svc_rqst *rqstp,
struct svc_rdma_req_map *vec)
@@ -380,25 +394,21 @@ static int send_reply_chunks(struct svcxprt_rdma *xprt,
int chunk_off;
int nchunks;
struct rpcrdma_segment *ch;
- struct rpcrdma_write_array *arg_ary;
struct rpcrdma_write_array *res_ary;
int ret;
- arg_ary = svc_rdma_get_reply_array(rdma_argp);
- if (!arg_ary)
- return 0;
/* XXX: need to fix when reply lists occur with read-list and or
* write-list */
res_ary = (struct rpcrdma_write_array *)
&rdma_resp->rm_body.rm_chunks[2];
/* xdr offset starts at RPC message */
- nchunks = be32_to_cpu(arg_ary->wc_nchunks);
+ nchunks = be32_to_cpu(rp_ary->wc_nchunks);
for (xdr_off = 0, chunk_no = 0;
xfer_len && chunk_no < nchunks;
chunk_no++) {
u64 rs_offset;
- ch = &arg_ary->wc_array[chunk_no].wc_target;
+ ch = &rp_ary->wc_array[chunk_no].wc_target;
write_len = min(xfer_len, be32_to_cpu(ch->rs_length));
/* Prepare the reply chunk given the length actually
@@ -415,11 +425,8 @@ static int send_reply_chunks(struct svcxprt_rdma *xprt,
xdr_off,
write_len,
vec);
- if (ret <= 0) {
- dprintk("svcrdma: RDMA_WRITE failed, ret=%d\n",
- ret);
- return -EIO;
- }
+ if (ret <= 0)
+ goto out_err;
chunk_off += ret;
xdr_off += ret;
xfer_len -= ret;
@@ -430,6 +437,10 @@ static int send_reply_chunks(struct svcxprt_rdma *xprt,
svc_rdma_xdr_encode_reply_array(res_ary, chunk_no);
return rqstp->rq_res.len;
+
+out_err:
+ pr_err("svcrdma: failed to send reply chunks, rc=%d\n", ret);
+ return -EIO;
}
/* This function prepares the portion of the RPCRDMA message to be
@@ -452,30 +463,21 @@ static int send_reply(struct svcxprt_rdma *rdma,
struct svc_rqst *rqstp,
struct page *page,
struct rpcrdma_msg *rdma_resp,
- struct svc_rdma_op_ctxt *ctxt,
struct svc_rdma_req_map *vec,
int byte_count)
{
+ struct svc_rdma_op_ctxt *ctxt;
struct ib_send_wr send_wr;
u32 xdr_off;
int sge_no;
int sge_bytes;
int page_no;
int pages;
- int ret;
-
- /* Post a recv buffer to handle another request. */
- ret = svc_rdma_post_recv(rdma, GFP_KERNEL);
- if (ret) {
- printk(KERN_INFO
- "svcrdma: could not post a receive buffer, err=%d."
- "Closing transport %p.\n", ret, rdma);
- set_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags);
- svc_rdma_put_context(ctxt, 0);
- return -ENOTCONN;
- }
+ int ret = -EIO;
/* Prepare the context */
+ ctxt = svc_rdma_get_context(rdma);
+ ctxt->direction = DMA_TO_DEVICE;
ctxt->pages[0] = page;
ctxt->count = 1;
@@ -543,8 +545,8 @@ static int send_reply(struct svcxprt_rdma *rdma,
goto err;
}
memset(&send_wr, 0, sizeof send_wr);
- ctxt->wr_op = IB_WR_SEND;
- send_wr.wr_id = (unsigned long)ctxt;
+ ctxt->cqe.done = svc_rdma_wc_send;
+ send_wr.wr_cqe = &ctxt->cqe;
send_wr.sg_list = ctxt->sge;
send_wr.num_sge = sge_no;
send_wr.opcode = IB_WR_SEND;
@@ -559,7 +561,7 @@ static int send_reply(struct svcxprt_rdma *rdma,
err:
svc_rdma_unmap_dma(ctxt);
svc_rdma_put_context(ctxt, 1);
- return -EIO;
+ return ret;
}
void svc_rdma_prep_reply_hdr(struct svc_rqst *rqstp)
@@ -573,12 +575,11 @@ int svc_rdma_sendto(struct svc_rqst *rqstp)
container_of(xprt, struct svcxprt_rdma, sc_xprt);
struct rpcrdma_msg *rdma_argp;
struct rpcrdma_msg *rdma_resp;
- struct rpcrdma_write_array *reply_ary;
+ struct rpcrdma_write_array *wr_ary, *rp_ary;
enum rpcrdma_proc reply_type;
int ret;
int inline_bytes;
struct page *res_page;
- struct svc_rdma_op_ctxt *ctxt;
struct svc_rdma_req_map *vec;
dprintk("svcrdma: sending response for rqstp=%p\n", rqstp);
@@ -587,12 +588,12 @@ int svc_rdma_sendto(struct svc_rqst *rqstp)
* places this at the start of page 0.
*/
rdma_argp = page_address(rqstp->rq_pages[0]);
+ wr_ary = svc_rdma_get_write_array(rdma_argp);
+ rp_ary = svc_rdma_get_reply_array(rdma_argp, wr_ary);
/* Build an req vec for the XDR */
- ctxt = svc_rdma_get_context(rdma);
- ctxt->direction = DMA_TO_DEVICE;
vec = svc_rdma_get_req_map(rdma);
- ret = svc_rdma_map_xdr(rdma, &rqstp->rq_res, vec);
+ ret = svc_rdma_map_xdr(rdma, &rqstp->rq_res, vec, wr_ary != NULL);
if (ret)
goto err0;
inline_bytes = rqstp->rq_res.len;
@@ -603,8 +604,7 @@ int svc_rdma_sendto(struct svc_rqst *rqstp)
if (!res_page)
goto err0;
rdma_resp = page_address(res_page);
- reply_ary = svc_rdma_get_reply_array(rdma_argp);
- if (reply_ary)
+ if (rp_ary)
reply_type = RDMA_NOMSG;
else
reply_type = RDMA_MSG;
@@ -612,27 +612,31 @@ int svc_rdma_sendto(struct svc_rqst *rqstp)
rdma_resp, reply_type);
/* Send any write-chunk data and build resp write-list */
- ret = send_write_chunks(rdma, rdma_argp, rdma_resp,
- rqstp, vec);
- if (ret < 0) {
- printk(KERN_ERR "svcrdma: failed to send write chunks, rc=%d\n",
- ret);
- goto err1;
+ if (wr_ary) {
+ ret = send_write_chunks(rdma, wr_ary, rdma_resp, rqstp, vec);
+ if (ret < 0)
+ goto err1;
+ inline_bytes -= ret + xdr_padsize(ret);
}
- inline_bytes -= ret;
/* Send any reply-list data and update resp reply-list */
- ret = send_reply_chunks(rdma, rdma_argp, rdma_resp,
- rqstp, vec);
- if (ret < 0) {
- printk(KERN_ERR "svcrdma: failed to send reply chunks, rc=%d\n",
- ret);
- goto err1;
+ if (rp_ary) {
+ ret = send_reply_chunks(rdma, rp_ary, rdma_resp, rqstp, vec);
+ if (ret < 0)
+ goto err1;
+ inline_bytes -= ret;
}
- inline_bytes -= ret;
- ret = send_reply(rdma, rqstp, res_page, rdma_resp, ctxt, vec,
+ /* Post a fresh Receive buffer _before_ sending the reply */
+ ret = svc_rdma_post_recv(rdma, GFP_KERNEL);
+ if (ret)
+ goto err1;
+
+ ret = send_reply(rdma, rqstp, res_page, rdma_resp, vec,
inline_bytes);
+ if (ret < 0)
+ goto err1;
+
svc_rdma_put_req_map(rdma, vec);
dprintk("svcrdma: send_reply returns %d\n", ret);
return ret;
@@ -641,6 +645,70 @@ int svc_rdma_sendto(struct svc_rqst *rqstp)
put_page(res_page);
err0:
svc_rdma_put_req_map(rdma, vec);
- svc_rdma_put_context(ctxt, 0);
- return ret;
+ pr_err("svcrdma: Could not send reply, err=%d. Closing transport.\n",
+ ret);
+ set_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags);
+ return -ENOTCONN;
+}
+
+void svc_rdma_send_error(struct svcxprt_rdma *xprt, struct rpcrdma_msg *rmsgp,
+ int status)
+{
+ struct ib_send_wr err_wr;
+ struct page *p;
+ struct svc_rdma_op_ctxt *ctxt;
+ enum rpcrdma_errcode err;
+ __be32 *va;
+ int length;
+ int ret;
+
+ ret = svc_rdma_repost_recv(xprt, GFP_KERNEL);
+ if (ret)
+ return;
+
+ p = alloc_page(GFP_KERNEL);
+ if (!p)
+ return;
+ va = page_address(p);
+
+ /* XDR encode an error reply */
+ err = ERR_CHUNK;
+ if (status == -EPROTONOSUPPORT)
+ err = ERR_VERS;
+ length = svc_rdma_xdr_encode_error(xprt, rmsgp, err, va);
+
+ ctxt = svc_rdma_get_context(xprt);
+ ctxt->direction = DMA_TO_DEVICE;
+ ctxt->count = 1;
+ ctxt->pages[0] = p;
+
+ /* Prepare SGE for local address */
+ ctxt->sge[0].lkey = xprt->sc_pd->local_dma_lkey;
+ ctxt->sge[0].length = length;
+ ctxt->sge[0].addr = ib_dma_map_page(xprt->sc_cm_id->device,
+ p, 0, length, DMA_TO_DEVICE);
+ if (ib_dma_mapping_error(xprt->sc_cm_id->device, ctxt->sge[0].addr)) {
+ dprintk("svcrdma: Error mapping buffer for protocol error\n");
+ svc_rdma_put_context(ctxt, 1);
+ return;
+ }
+ atomic_inc(&xprt->sc_dma_used);
+
+ /* Prepare SEND WR */
+ memset(&err_wr, 0, sizeof(err_wr));
+ ctxt->cqe.done = svc_rdma_wc_send;
+ err_wr.wr_cqe = &ctxt->cqe;
+ err_wr.sg_list = ctxt->sge;
+ err_wr.num_sge = 1;
+ err_wr.opcode = IB_WR_SEND;
+ err_wr.send_flags = IB_SEND_SIGNALED;
+
+ /* Post It */
+ ret = svc_rdma_send(xprt, &err_wr);
+ if (ret) {
+ dprintk("svcrdma: Error %d posting send for protocol error\n",
+ ret);
+ svc_rdma_unmap_dma(ctxt);
+ svc_rdma_put_context(ctxt, 1);
+ }
}
diff --git a/net/sunrpc/xprtrdma/svc_rdma_transport.c b/net/sunrpc/xprtrdma/svc_rdma_transport.c
index 5763825d09bf..dd9440137834 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_transport.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_transport.c
@@ -63,17 +63,10 @@ static struct svc_xprt *svc_rdma_create(struct svc_serv *serv,
int flags);
static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt);
static void svc_rdma_release_rqst(struct svc_rqst *);
-static void dto_tasklet_func(unsigned long data);
static void svc_rdma_detach(struct svc_xprt *xprt);
static void svc_rdma_free(struct svc_xprt *xprt);
static int svc_rdma_has_wspace(struct svc_xprt *xprt);
static int svc_rdma_secure_port(struct svc_rqst *);
-static void rq_cq_reap(struct svcxprt_rdma *xprt);
-static void sq_cq_reap(struct svcxprt_rdma *xprt);
-
-static DECLARE_TASKLET(dto_tasklet, dto_tasklet_func, 0UL);
-static DEFINE_SPINLOCK(dto_lock);
-static LIST_HEAD(dto_xprt_q);
static struct svc_xprt_ops svc_rdma_ops = {
.xpo_create = svc_rdma_create,
@@ -352,15 +345,6 @@ static void svc_rdma_destroy_maps(struct svcxprt_rdma *xprt)
}
}
-/* ib_cq event handler */
-static void cq_event_handler(struct ib_event *event, void *context)
-{
- struct svc_xprt *xprt = context;
- dprintk("svcrdma: received CQ event %s (%d), context=%p\n",
- ib_event_msg(event->event), event->event, context);
- set_bit(XPT_CLOSE, &xprt->xpt_flags);
-}
-
/* QP event handler */
static void qp_event_handler(struct ib_event *event, void *context)
{
@@ -392,251 +376,171 @@ static void qp_event_handler(struct ib_event *event, void *context)
}
}
-/*
- * Data Transfer Operation Tasklet
+/**
+ * svc_rdma_wc_receive - Invoked by RDMA provider for each polled Receive WC
+ * @cq: completion queue
+ * @wc: completed WR
*
- * Walks a list of transports with I/O pending, removing entries as
- * they are added to the server's I/O pending list. Two bits indicate
- * if SQ, RQ, or both have I/O pending. The dto_lock is an irqsave
- * spinlock that serializes access to the transport list with the RQ
- * and SQ interrupt handlers.
*/
-static void dto_tasklet_func(unsigned long data)
+static void svc_rdma_wc_receive(struct ib_cq *cq, struct ib_wc *wc)
{
- struct svcxprt_rdma *xprt;
- unsigned long flags;
+ struct svcxprt_rdma *xprt = cq->cq_context;
+ struct ib_cqe *cqe = wc->wr_cqe;
+ struct svc_rdma_op_ctxt *ctxt;
- spin_lock_irqsave(&dto_lock, flags);
- while (!list_empty(&dto_xprt_q)) {
- xprt = list_entry(dto_xprt_q.next,
- struct svcxprt_rdma, sc_dto_q);
- list_del_init(&xprt->sc_dto_q);
- spin_unlock_irqrestore(&dto_lock, flags);
+ /* WARNING: Only wc->wr_cqe and wc->status are reliable */
+ ctxt = container_of(cqe, struct svc_rdma_op_ctxt, cqe);
+ ctxt->wc_status = wc->status;
+ svc_rdma_unmap_dma(ctxt);
- rq_cq_reap(xprt);
- sq_cq_reap(xprt);
+ if (wc->status != IB_WC_SUCCESS)
+ goto flushed;
- svc_xprt_put(&xprt->sc_xprt);
- spin_lock_irqsave(&dto_lock, flags);
- }
- spin_unlock_irqrestore(&dto_lock, flags);
+ /* All wc fields are now known to be valid */
+ ctxt->byte_len = wc->byte_len;
+ spin_lock(&xprt->sc_rq_dto_lock);
+ list_add_tail(&ctxt->dto_q, &xprt->sc_rq_dto_q);
+ spin_unlock(&xprt->sc_rq_dto_lock);
+
+ set_bit(XPT_DATA, &xprt->sc_xprt.xpt_flags);
+ if (test_bit(RDMAXPRT_CONN_PENDING, &xprt->sc_flags))
+ goto out;
+ svc_xprt_enqueue(&xprt->sc_xprt);
+ goto out;
+
+flushed:
+ if (wc->status != IB_WC_WR_FLUSH_ERR)
+ pr_warn("svcrdma: receive: %s (%u/0x%x)\n",
+ ib_wc_status_msg(wc->status),
+ wc->status, wc->vendor_err);
+ set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags);
+ svc_rdma_put_context(ctxt, 1);
+
+out:
+ svc_xprt_put(&xprt->sc_xprt);
}
-/*
- * Receive Queue Completion Handler
- *
- * Since an RQ completion handler is called on interrupt context, we
- * need to defer the handling of the I/O to a tasklet
- */
-static void rq_comp_handler(struct ib_cq *cq, void *cq_context)
+static void svc_rdma_send_wc_common(struct svcxprt_rdma *xprt,
+ struct ib_wc *wc,
+ const char *opname)
{
- struct svcxprt_rdma *xprt = cq_context;
- unsigned long flags;
-
- /* Guard against unconditional flush call for destroyed QP */
- if (atomic_read(&xprt->sc_xprt.xpt_ref.refcount)==0)
- return;
+ if (wc->status != IB_WC_SUCCESS)
+ goto err;
- /*
- * Set the bit regardless of whether or not it's on the list
- * because it may be on the list already due to an SQ
- * completion.
- */
- set_bit(RDMAXPRT_RQ_PENDING, &xprt->sc_flags);
+out:
+ atomic_dec(&xprt->sc_sq_count);
+ wake_up(&xprt->sc_send_wait);
+ return;
+
+err:
+ set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags);
+ if (wc->status != IB_WC_WR_FLUSH_ERR)
+ pr_err("svcrdma: %s: %s (%u/0x%x)\n",
+ opname, ib_wc_status_msg(wc->status),
+ wc->status, wc->vendor_err);
+ goto out;
+}
- /*
- * If this transport is not already on the DTO transport queue,
- * add it
- */
- spin_lock_irqsave(&dto_lock, flags);
- if (list_empty(&xprt->sc_dto_q)) {
- svc_xprt_get(&xprt->sc_xprt);
- list_add_tail(&xprt->sc_dto_q, &dto_xprt_q);
- }
- spin_unlock_irqrestore(&dto_lock, flags);
+static void svc_rdma_send_wc_common_put(struct ib_cq *cq, struct ib_wc *wc,
+ const char *opname)
+{
+ struct svcxprt_rdma *xprt = cq->cq_context;
- /* Tasklet does all the work to avoid irqsave locks. */
- tasklet_schedule(&dto_tasklet);
+ svc_rdma_send_wc_common(xprt, wc, opname);
+ svc_xprt_put(&xprt->sc_xprt);
}
-/*
- * rq_cq_reap - Process the RQ CQ.
- *
- * Take all completing WC off the CQE and enqueue the associated DTO
- * context on the dto_q for the transport.
+/**
+ * svc_rdma_wc_send - Invoked by RDMA provider for each polled Send WC
+ * @cq: completion queue
+ * @wc: completed WR
*
- * Note that caller must hold a transport reference.
*/
-static void rq_cq_reap(struct svcxprt_rdma *xprt)
+void svc_rdma_wc_send(struct ib_cq *cq, struct ib_wc *wc)
{
- int ret;
- struct ib_wc wc;
- struct svc_rdma_op_ctxt *ctxt = NULL;
+ struct ib_cqe *cqe = wc->wr_cqe;
+ struct svc_rdma_op_ctxt *ctxt;
- if (!test_and_clear_bit(RDMAXPRT_RQ_PENDING, &xprt->sc_flags))
- return;
+ svc_rdma_send_wc_common_put(cq, wc, "send");
- ib_req_notify_cq(xprt->sc_rq_cq, IB_CQ_NEXT_COMP);
- atomic_inc(&rdma_stat_rq_poll);
+ ctxt = container_of(cqe, struct svc_rdma_op_ctxt, cqe);
+ svc_rdma_unmap_dma(ctxt);
+ svc_rdma_put_context(ctxt, 1);
+}
- while ((ret = ib_poll_cq(xprt->sc_rq_cq, 1, &wc)) > 0) {
- ctxt = (struct svc_rdma_op_ctxt *)(unsigned long)wc.wr_id;
- ctxt->wc_status = wc.status;
- ctxt->byte_len = wc.byte_len;
- svc_rdma_unmap_dma(ctxt);
- if (wc.status != IB_WC_SUCCESS) {
- /* Close the transport */
- dprintk("svcrdma: transport closing putting ctxt %p\n", ctxt);
- set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags);
- svc_rdma_put_context(ctxt, 1);
- svc_xprt_put(&xprt->sc_xprt);
- continue;
- }
- spin_lock_bh(&xprt->sc_rq_dto_lock);
- list_add_tail(&ctxt->dto_q, &xprt->sc_rq_dto_q);
- spin_unlock_bh(&xprt->sc_rq_dto_lock);
- svc_xprt_put(&xprt->sc_xprt);
- }
+/**
+ * svc_rdma_wc_write - Invoked by RDMA provider for each polled Write WC
+ * @cq: completion queue
+ * @wc: completed WR
+ *
+ */
+void svc_rdma_wc_write(struct ib_cq *cq, struct ib_wc *wc)
+{
+ struct ib_cqe *cqe = wc->wr_cqe;
+ struct svc_rdma_op_ctxt *ctxt;
- if (ctxt)
- atomic_inc(&rdma_stat_rq_prod);
+ svc_rdma_send_wc_common_put(cq, wc, "write");
- set_bit(XPT_DATA, &xprt->sc_xprt.xpt_flags);
- /*
- * If data arrived before established event,
- * don't enqueue. This defers RPC I/O until the
- * RDMA connection is complete.
- */
- if (!test_bit(RDMAXPRT_CONN_PENDING, &xprt->sc_flags))
- svc_xprt_enqueue(&xprt->sc_xprt);
+ ctxt = container_of(cqe, struct svc_rdma_op_ctxt, cqe);
+ svc_rdma_unmap_dma(ctxt);
+ svc_rdma_put_context(ctxt, 0);
}
-/*
- * Process a completion context
+/**
+ * svc_rdma_wc_reg - Invoked by RDMA provider for each polled FASTREG WC
+ * @cq: completion queue
+ * @wc: completed WR
+ *
*/
-static void process_context(struct svcxprt_rdma *xprt,
- struct svc_rdma_op_ctxt *ctxt)
+void svc_rdma_wc_reg(struct ib_cq *cq, struct ib_wc *wc)
{
- struct svc_rdma_op_ctxt *read_hdr;
- int free_pages = 0;
-
- svc_rdma_unmap_dma(ctxt);
+ svc_rdma_send_wc_common_put(cq, wc, "fastreg");
+}
- switch (ctxt->wr_op) {
- case IB_WR_SEND:
- free_pages = 1;
- break;
+/**
+ * svc_rdma_wc_read - Invoked by RDMA provider for each polled Read WC
+ * @cq: completion queue
+ * @wc: completed WR
+ *
+ */
+void svc_rdma_wc_read(struct ib_cq *cq, struct ib_wc *wc)
+{
+ struct svcxprt_rdma *xprt = cq->cq_context;
+ struct ib_cqe *cqe = wc->wr_cqe;
+ struct svc_rdma_op_ctxt *ctxt;
- case IB_WR_RDMA_WRITE:
- break;
+ svc_rdma_send_wc_common(xprt, wc, "read");
- case IB_WR_RDMA_READ:
- case IB_WR_RDMA_READ_WITH_INV:
- svc_rdma_put_frmr(xprt, ctxt->frmr);
+ ctxt = container_of(cqe, struct svc_rdma_op_ctxt, cqe);
+ svc_rdma_unmap_dma(ctxt);
+ svc_rdma_put_frmr(xprt, ctxt->frmr);
- if (!test_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags))
- break;
+ if (test_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags)) {
+ struct svc_rdma_op_ctxt *read_hdr;
read_hdr = ctxt->read_hdr;
- svc_rdma_put_context(ctxt, 0);
-
- spin_lock_bh(&xprt->sc_rq_dto_lock);
- set_bit(XPT_DATA, &xprt->sc_xprt.xpt_flags);
+ spin_lock(&xprt->sc_rq_dto_lock);
list_add_tail(&read_hdr->dto_q,
&xprt->sc_read_complete_q);
- spin_unlock_bh(&xprt->sc_rq_dto_lock);
- svc_xprt_enqueue(&xprt->sc_xprt);
- return;
+ spin_unlock(&xprt->sc_rq_dto_lock);
- default:
- dprintk("svcrdma: unexpected completion opcode=%d\n",
- ctxt->wr_op);
- break;
+ set_bit(XPT_DATA, &xprt->sc_xprt.xpt_flags);
+ svc_xprt_enqueue(&xprt->sc_xprt);
}
- svc_rdma_put_context(ctxt, free_pages);
+ svc_rdma_put_context(ctxt, 0);
+ svc_xprt_put(&xprt->sc_xprt);
}
-/*
- * Send Queue Completion Handler - potentially called on interrupt context.
+/**
+ * svc_rdma_wc_inv - Invoked by RDMA provider for each polled LOCAL_INV WC
+ * @cq: completion queue
+ * @wc: completed WR
*
- * Note that caller must hold a transport reference.
*/
-static void sq_cq_reap(struct svcxprt_rdma *xprt)
-{
- struct svc_rdma_op_ctxt *ctxt = NULL;
- struct ib_wc wc_a[6];
- struct ib_wc *wc;
- struct ib_cq *cq = xprt->sc_sq_cq;
- int ret;
-
- memset(wc_a, 0, sizeof(wc_a));
-
- if (!test_and_clear_bit(RDMAXPRT_SQ_PENDING, &xprt->sc_flags))
- return;
-
- ib_req_notify_cq(xprt->sc_sq_cq, IB_CQ_NEXT_COMP);
- atomic_inc(&rdma_stat_sq_poll);
- while ((ret = ib_poll_cq(cq, ARRAY_SIZE(wc_a), wc_a)) > 0) {
- int i;
-
- for (i = 0; i < ret; i++) {
- wc = &wc_a[i];
- if (wc->status != IB_WC_SUCCESS) {
- dprintk("svcrdma: sq wc err status %s (%d)\n",
- ib_wc_status_msg(wc->status),
- wc->status);
-
- /* Close the transport */
- set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags);
- }
-
- /* Decrement used SQ WR count */
- atomic_dec(&xprt->sc_sq_count);
- wake_up(&xprt->sc_send_wait);
-
- ctxt = (struct svc_rdma_op_ctxt *)
- (unsigned long)wc->wr_id;
- if (ctxt)
- process_context(xprt, ctxt);
-
- svc_xprt_put(&xprt->sc_xprt);
- }
- }
-
- if (ctxt)
- atomic_inc(&rdma_stat_sq_prod);
-}
-
-static void sq_comp_handler(struct ib_cq *cq, void *cq_context)
+void svc_rdma_wc_inv(struct ib_cq *cq, struct ib_wc *wc)
{
- struct svcxprt_rdma *xprt = cq_context;
- unsigned long flags;
-
- /* Guard against unconditional flush call for destroyed QP */
- if (atomic_read(&xprt->sc_xprt.xpt_ref.refcount)==0)
- return;
-
- /*
- * Set the bit regardless of whether or not it's on the list
- * because it may be on the list already due to an RQ
- * completion.
- */
- set_bit(RDMAXPRT_SQ_PENDING, &xprt->sc_flags);
-
- /*
- * If this transport is not already on the DTO transport queue,
- * add it
- */
- spin_lock_irqsave(&dto_lock, flags);
- if (list_empty(&xprt->sc_dto_q)) {
- svc_xprt_get(&xprt->sc_xprt);
- list_add_tail(&xprt->sc_dto_q, &dto_xprt_q);
- }
- spin_unlock_irqrestore(&dto_lock, flags);
-
- /* Tasklet does all the work to avoid irqsave locks. */
- tasklet_schedule(&dto_tasklet);
+ svc_rdma_send_wc_common_put(cq, wc, "localInv");
}
static struct svcxprt_rdma *rdma_create_xprt(struct svc_serv *serv,
@@ -681,6 +585,7 @@ int svc_rdma_post_recv(struct svcxprt_rdma *xprt, gfp_t flags)
ctxt = svc_rdma_get_context(xprt);
buflen = 0;
ctxt->direction = DMA_FROM_DEVICE;
+ ctxt->cqe.done = svc_rdma_wc_receive;
for (sge_no = 0; buflen < xprt->sc_max_req_size; sge_no++) {
if (sge_no >= xprt->sc_max_sge) {
pr_err("svcrdma: Too many sges (%d)\n", sge_no);
@@ -705,7 +610,7 @@ int svc_rdma_post_recv(struct svcxprt_rdma *xprt, gfp_t flags)
recv_wr.next = NULL;
recv_wr.sg_list = &ctxt->sge[0];
recv_wr.num_sge = ctxt->count;
- recv_wr.wr_id = (u64)(unsigned long)ctxt;
+ recv_wr.wr_cqe = &ctxt->cqe;
svc_xprt_get(&xprt->sc_xprt);
ret = ib_post_recv(xprt->sc_qp, &recv_wr, &bad_recv_wr);
@@ -722,6 +627,21 @@ int svc_rdma_post_recv(struct svcxprt_rdma *xprt, gfp_t flags)
return -ENOMEM;
}
+int svc_rdma_repost_recv(struct svcxprt_rdma *xprt, gfp_t flags)
+{
+ int ret = 0;
+
+ ret = svc_rdma_post_recv(xprt, flags);
+ if (ret) {
+ pr_err("svcrdma: could not post a receive buffer, err=%d.\n",
+ ret);
+ pr_err("svcrdma: closing transport %p.\n", xprt);
+ set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags);
+ ret = -ENOTCONN;
+ }
+ return ret;
+}
+
/*
* This function handles the CONNECT_REQUEST event on a listening
* endpoint. It is passed the cma_id for the _new_ connection. The context in
@@ -869,7 +789,7 @@ static struct svc_xprt *svc_rdma_create(struct svc_serv *serv,
int ret;
dprintk("svcrdma: Creating RDMA socket\n");
- if (sa->sa_family != AF_INET) {
+ if ((sa->sa_family != AF_INET) && (sa->sa_family != AF_INET6)) {
dprintk("svcrdma: Address family %d is not supported.\n", sa->sa_family);
return ERR_PTR(-EAFNOSUPPORT);
}
@@ -885,6 +805,16 @@ static struct svc_xprt *svc_rdma_create(struct svc_serv *serv,
goto err0;
}
+ /* Allow both IPv4 and IPv6 sockets to bind a single port
+ * at the same time.
+ */
+#if IS_ENABLED(CONFIG_IPV6)
+ ret = rdma_set_afonly(listen_id, 1);
+ if (ret) {
+ dprintk("svcrdma: rdma_set_afonly failed = %d\n", ret);
+ goto err1;
+ }
+#endif
ret = rdma_bind_addr(listen_id, sa);
if (ret) {
dprintk("svcrdma: rdma_bind_addr failed = %d\n", ret);
@@ -1011,7 +941,6 @@ static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt)
struct svcxprt_rdma *listen_rdma;
struct svcxprt_rdma *newxprt = NULL;
struct rdma_conn_param conn_param;
- struct ib_cq_init_attr cq_attr = {};
struct ib_qp_init_attr qp_attr;
struct ib_device *dev;
unsigned int i;
@@ -1069,22 +998,14 @@ static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt)
dprintk("svcrdma: error creating PD for connect request\n");
goto errout;
}
- cq_attr.cqe = newxprt->sc_sq_depth;
- newxprt->sc_sq_cq = ib_create_cq(dev,
- sq_comp_handler,
- cq_event_handler,
- newxprt,
- &cq_attr);
+ newxprt->sc_sq_cq = ib_alloc_cq(dev, newxprt, newxprt->sc_sq_depth,
+ 0, IB_POLL_SOFTIRQ);
if (IS_ERR(newxprt->sc_sq_cq)) {
dprintk("svcrdma: error creating SQ CQ for connect request\n");
goto errout;
}
- cq_attr.cqe = newxprt->sc_rq_depth;
- newxprt->sc_rq_cq = ib_create_cq(dev,
- rq_comp_handler,
- cq_event_handler,
- newxprt,
- &cq_attr);
+ newxprt->sc_rq_cq = ib_alloc_cq(dev, newxprt, newxprt->sc_rq_depth,
+ 0, IB_POLL_SOFTIRQ);
if (IS_ERR(newxprt->sc_rq_cq)) {
dprintk("svcrdma: error creating RQ CQ for connect request\n");
goto errout;
@@ -1162,7 +1083,7 @@ static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt)
newxprt->sc_dev_caps |= SVCRDMA_DEVCAP_READ_W_INV;
/* Post receive buffers */
- for (i = 0; i < newxprt->sc_rq_depth; i++) {
+ for (i = 0; i < newxprt->sc_max_requests; i++) {
ret = svc_rdma_post_recv(newxprt, GFP_KERNEL);
if (ret) {
dprintk("svcrdma: failure posting receive buffers\n");
@@ -1173,13 +1094,6 @@ static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt)
/* Swap out the handler */
newxprt->sc_cm_id->event_handler = rdma_cma_handler;
- /*
- * Arm the CQs for the SQ and RQ before accepting so we can't
- * miss the first message
- */
- ib_req_notify_cq(newxprt->sc_sq_cq, IB_CQ_NEXT_COMP);
- ib_req_notify_cq(newxprt->sc_rq_cq, IB_CQ_NEXT_COMP);
-
/* Accept Connection */
set_bit(RDMAXPRT_CONN_PENDING, &newxprt->sc_flags);
memset(&conn_param, 0, sizeof conn_param);
@@ -1266,6 +1180,9 @@ static void __svc_rdma_free(struct work_struct *work)
dprintk("svcrdma: %s(%p)\n", __func__, rdma);
+ if (rdma->sc_qp && !IS_ERR(rdma->sc_qp))
+ ib_drain_qp(rdma->sc_qp);
+
/* We should only be called from kref_put */
if (atomic_read(&xprt->xpt_ref.refcount) != 0)
pr_err("svcrdma: sc_xprt still in use? (%d)\n",
@@ -1319,10 +1236,10 @@ static void __svc_rdma_free(struct work_struct *work)
ib_destroy_qp(rdma->sc_qp);
if (rdma->sc_sq_cq && !IS_ERR(rdma->sc_sq_cq))
- ib_destroy_cq(rdma->sc_sq_cq);
+ ib_free_cq(rdma->sc_sq_cq);
if (rdma->sc_rq_cq && !IS_ERR(rdma->sc_rq_cq))
- ib_destroy_cq(rdma->sc_rq_cq);
+ ib_free_cq(rdma->sc_rq_cq);
if (rdma->sc_pd && !IS_ERR(rdma->sc_pd))
ib_dealloc_pd(rdma->sc_pd);
@@ -1383,9 +1300,6 @@ int svc_rdma_send(struct svcxprt_rdma *xprt, struct ib_send_wr *wr)
spin_unlock_bh(&xprt->sc_lock);
atomic_inc(&rdma_stat_sq_starve);
- /* See if we can opportunistically reap SQ WR to make room */
- sq_cq_reap(xprt);
-
/* Wait until SQ WR available if SQ still full */
wait_event(xprt->sc_send_wait,
atomic_read(&xprt->sc_sq_count) <
@@ -1418,57 +1332,3 @@ int svc_rdma_send(struct svcxprt_rdma *xprt, struct ib_send_wr *wr)
}
return ret;
}
-
-void svc_rdma_send_error(struct svcxprt_rdma *xprt, struct rpcrdma_msg *rmsgp,
- enum rpcrdma_errcode err)
-{
- struct ib_send_wr err_wr;
- struct page *p;
- struct svc_rdma_op_ctxt *ctxt;
- __be32 *va;
- int length;
- int ret;
-
- p = alloc_page(GFP_KERNEL);
- if (!p)
- return;
- va = page_address(p);
-
- /* XDR encode error */
- length = svc_rdma_xdr_encode_error(xprt, rmsgp, err, va);
-
- ctxt = svc_rdma_get_context(xprt);
- ctxt->direction = DMA_FROM_DEVICE;
- ctxt->count = 1;
- ctxt->pages[0] = p;
-
- /* Prepare SGE for local address */
- ctxt->sge[0].addr = ib_dma_map_page(xprt->sc_cm_id->device,
- p, 0, length, DMA_FROM_DEVICE);
- if (ib_dma_mapping_error(xprt->sc_cm_id->device, ctxt->sge[0].addr)) {
- put_page(p);
- svc_rdma_put_context(ctxt, 1);
- return;
- }
- atomic_inc(&xprt->sc_dma_used);
- ctxt->sge[0].lkey = xprt->sc_pd->local_dma_lkey;
- ctxt->sge[0].length = length;
-
- /* Prepare SEND WR */
- memset(&err_wr, 0, sizeof err_wr);
- ctxt->wr_op = IB_WR_SEND;
- err_wr.wr_id = (unsigned long)ctxt;
- err_wr.sg_list = ctxt->sge;
- err_wr.num_sge = 1;
- err_wr.opcode = IB_WR_SEND;
- err_wr.send_flags = IB_SEND_SIGNALED;
-
- /* Post It */
- ret = svc_rdma_send(xprt, &err_wr);
- if (ret) {
- dprintk("svcrdma: Error %d posting send for protocol error\n",
- ret);
- svc_rdma_unmap_dma(ctxt);
- svc_rdma_put_context(ctxt, 1);
- }
-}
diff --git a/net/sunrpc/xprtrdma/transport.c b/net/sunrpc/xprtrdma/transport.c
index b1b009f10ea3..99d2e5b72726 100644
--- a/net/sunrpc/xprtrdma/transport.c
+++ b/net/sunrpc/xprtrdma/transport.c
@@ -73,6 +73,8 @@ static unsigned int xprt_rdma_memreg_strategy = RPCRDMA_FRMR;
static unsigned int min_slot_table_size = RPCRDMA_MIN_SLOT_TABLE;
static unsigned int max_slot_table_size = RPCRDMA_MAX_SLOT_TABLE;
+static unsigned int min_inline_size = RPCRDMA_MIN_INLINE;
+static unsigned int max_inline_size = RPCRDMA_MAX_INLINE;
static unsigned int zero;
static unsigned int max_padding = PAGE_SIZE;
static unsigned int min_memreg = RPCRDMA_BOUNCEBUFFERS;
@@ -96,6 +98,8 @@ static struct ctl_table xr_tunables_table[] = {
.maxlen = sizeof(unsigned int),
.mode = 0644,
.proc_handler = proc_dointvec,
+ .extra1 = &min_inline_size,
+ .extra2 = &max_inline_size,
},
{
.procname = "rdma_max_inline_write",
@@ -103,6 +107,8 @@ static struct ctl_table xr_tunables_table[] = {
.maxlen = sizeof(unsigned int),
.mode = 0644,
.proc_handler = proc_dointvec,
+ .extra1 = &min_inline_size,
+ .extra2 = &max_inline_size,
},
{
.procname = "rdma_inline_write_padding",
@@ -508,6 +514,7 @@ xprt_rdma_allocate(struct rpc_task *task, size_t size)
out:
dprintk("RPC: %s: size %zd, request 0x%p\n", __func__, size, req);
req->rl_connect_cookie = 0; /* our reserved value */
+ req->rl_task = task;
return req->rl_sendbuf->rg_base;
out_rdmabuf:
@@ -564,7 +571,6 @@ xprt_rdma_free(void *buffer)
struct rpcrdma_req *req;
struct rpcrdma_xprt *r_xprt;
struct rpcrdma_regbuf *rb;
- int i;
if (buffer == NULL)
return;
@@ -578,11 +584,8 @@ xprt_rdma_free(void *buffer)
dprintk("RPC: %s: called on 0x%p\n", __func__, req->rl_reply);
- for (i = 0; req->rl_nchunks;) {
- --req->rl_nchunks;
- i += r_xprt->rx_ia.ri_ops->ro_unmap(r_xprt,
- &req->rl_segments[i]);
- }
+ r_xprt->rx_ia.ri_ops->ro_unmap_safe(r_xprt, req,
+ !RPC_IS_ASYNC(req->rl_task));
rpcrdma_buffer_put(req);
}
@@ -707,6 +710,7 @@ static struct rpc_xprt_ops xprt_rdma_procs = {
#if defined(CONFIG_SUNRPC_BACKCHANNEL)
.bc_setup = xprt_rdma_bc_setup,
.bc_up = xprt_rdma_bc_up,
+ .bc_maxpayload = xprt_rdma_bc_maxpayload,
.bc_free_rqst = xprt_rdma_bc_free_rqst,
.bc_destroy = xprt_rdma_bc_destroy,
#endif
diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
index 878f1bfb1db9..b044d98a1370 100644
--- a/net/sunrpc/xprtrdma/verbs.c
+++ b/net/sunrpc/xprtrdma/verbs.c
@@ -112,89 +112,65 @@ rpcrdma_qp_async_error_upcall(struct ib_event *event, void *context)
}
}
+/**
+ * rpcrdma_wc_send - Invoked by RDMA provider for each polled Send WC
+ * @cq: completion queue (ignored)
+ * @wc: completed WR
+ *
+ */
static void
-rpcrdma_cq_async_error_upcall(struct ib_event *event, void *context)
+rpcrdma_wc_send(struct ib_cq *cq, struct ib_wc *wc)
{
- struct rpcrdma_ep *ep = context;
-
- pr_err("RPC: %s: %s on device %s ep %p\n",
- __func__, ib_event_msg(event->event),
- event->device->name, context);
- if (ep->rep_connected == 1) {
- ep->rep_connected = -EIO;
- rpcrdma_conn_func(ep);
- wake_up_all(&ep->rep_connect_wait);
- }
+ /* WARNING: Only wr_cqe and status are reliable at this point */
+ if (wc->status != IB_WC_SUCCESS && wc->status != IB_WC_WR_FLUSH_ERR)
+ pr_err("rpcrdma: Send: %s (%u/0x%x)\n",
+ ib_wc_status_msg(wc->status),
+ wc->status, wc->vendor_err);
}
static void
-rpcrdma_sendcq_process_wc(struct ib_wc *wc)
+rpcrdma_receive_worker(struct work_struct *work)
{
- /* WARNING: Only wr_id and status are reliable at this point */
- if (wc->wr_id == RPCRDMA_IGNORE_COMPLETION) {
- if (wc->status != IB_WC_SUCCESS &&
- wc->status != IB_WC_WR_FLUSH_ERR)
- pr_err("RPC: %s: SEND: %s\n",
- __func__, ib_wc_status_msg(wc->status));
- } else {
- struct rpcrdma_mw *r;
+ struct rpcrdma_rep *rep =
+ container_of(work, struct rpcrdma_rep, rr_work);
- r = (struct rpcrdma_mw *)(unsigned long)wc->wr_id;
- r->mw_sendcompletion(wc);
- }
+ rpcrdma_reply_handler(rep);
}
-/* The common case is a single send completion is waiting. By
- * passing two WC entries to ib_poll_cq, a return code of 1
- * means there is exactly one WC waiting and no more. We don't
- * have to invoke ib_poll_cq again to know that the CQ has been
- * properly drained.
+/* Perform basic sanity checking to avoid using garbage
+ * to update the credit grant value.
*/
static void
-rpcrdma_sendcq_poll(struct ib_cq *cq)
+rpcrdma_update_granted_credits(struct rpcrdma_rep *rep)
{
- struct ib_wc *pos, wcs[2];
- int count, rc;
+ struct rpcrdma_msg *rmsgp = rdmab_to_msg(rep->rr_rdmabuf);
+ struct rpcrdma_buffer *buffer = &rep->rr_rxprt->rx_buf;
+ u32 credits;
- do {
- pos = wcs;
+ if (rep->rr_len < RPCRDMA_HDRLEN_ERR)
+ return;
- rc = ib_poll_cq(cq, ARRAY_SIZE(wcs), pos);
- if (rc < 0)
- break;
+ credits = be32_to_cpu(rmsgp->rm_credit);
+ if (credits == 0)
+ credits = 1; /* don't deadlock */
+ else if (credits > buffer->rb_max_requests)
+ credits = buffer->rb_max_requests;
- count = rc;
- while (count-- > 0)
- rpcrdma_sendcq_process_wc(pos++);
- } while (rc == ARRAY_SIZE(wcs));
- return;
+ atomic_set(&buffer->rb_credits, credits);
}
-/* Handle provider send completion upcalls.
+/**
+ * rpcrdma_receive_wc - Invoked by RDMA provider for each polled Receive WC
+ * @cq: completion queue (ignored)
+ * @wc: completed WR
+ *
*/
static void
-rpcrdma_sendcq_upcall(struct ib_cq *cq, void *cq_context)
-{
- do {
- rpcrdma_sendcq_poll(cq);
- } while (ib_req_notify_cq(cq, IB_CQ_NEXT_COMP |
- IB_CQ_REPORT_MISSED_EVENTS) > 0);
-}
-
-static void
-rpcrdma_receive_worker(struct work_struct *work)
+rpcrdma_receive_wc(struct ib_cq *cq, struct ib_wc *wc)
{
- struct rpcrdma_rep *rep =
- container_of(work, struct rpcrdma_rep, rr_work);
-
- rpcrdma_reply_handler(rep);
-}
-
-static void
-rpcrdma_recvcq_process_wc(struct ib_wc *wc)
-{
- struct rpcrdma_rep *rep =
- (struct rpcrdma_rep *)(unsigned long)wc->wr_id;
+ struct ib_cqe *cqe = wc->wr_cqe;
+ struct rpcrdma_rep *rep = container_of(cqe, struct rpcrdma_rep,
+ rr_cqe);
/* WARNING: Only wr_id and status are reliable at this point */
if (wc->status != IB_WC_SUCCESS)
@@ -211,7 +187,8 @@ rpcrdma_recvcq_process_wc(struct ib_wc *wc)
ib_dma_sync_single_for_cpu(rep->rr_device,
rdmab_addr(rep->rr_rdmabuf),
rep->rr_len, DMA_FROM_DEVICE);
- prefetch(rdmab_to_msg(rep->rr_rdmabuf));
+
+ rpcrdma_update_granted_credits(rep);
out_schedule:
queue_work(rpcrdma_receive_wq, &rep->rr_work);
@@ -219,59 +196,13 @@ out_schedule:
out_fail:
if (wc->status != IB_WC_WR_FLUSH_ERR)
- pr_err("RPC: %s: rep %p: %s\n",
- __func__, rep, ib_wc_status_msg(wc->status));
+ pr_err("rpcrdma: Recv: %s (%u/0x%x)\n",
+ ib_wc_status_msg(wc->status),
+ wc->status, wc->vendor_err);
rep->rr_len = RPCRDMA_BAD_LEN;
goto out_schedule;
}
-/* The wc array is on stack: automatic memory is always CPU-local.
- *
- * struct ib_wc is 64 bytes, making the poll array potentially
- * large. But this is at the bottom of the call chain. Further
- * substantial work is done in another thread.
- */
-static void
-rpcrdma_recvcq_poll(struct ib_cq *cq)
-{
- struct ib_wc *pos, wcs[4];
- int count, rc;
-
- do {
- pos = wcs;
-
- rc = ib_poll_cq(cq, ARRAY_SIZE(wcs), pos);
- if (rc < 0)
- break;
-
- count = rc;
- while (count-- > 0)
- rpcrdma_recvcq_process_wc(pos++);
- } while (rc == ARRAY_SIZE(wcs));
-}
-
-/* Handle provider receive completion upcalls.
- */
-static void
-rpcrdma_recvcq_upcall(struct ib_cq *cq, void *cq_context)
-{
- do {
- rpcrdma_recvcq_poll(cq);
- } while (ib_req_notify_cq(cq, IB_CQ_NEXT_COMP |
- IB_CQ_REPORT_MISSED_EVENTS) > 0);
-}
-
-static void
-rpcrdma_flush_cqs(struct rpcrdma_ep *ep)
-{
- struct ib_wc wc;
-
- while (ib_poll_cq(ep->rep_attr.recv_cq, 1, &wc) > 0)
- rpcrdma_recvcq_process_wc(&wc);
- while (ib_poll_cq(ep->rep_attr.send_cq, 1, &wc) > 0)
- rpcrdma_sendcq_process_wc(&wc);
-}
-
static int
rpcrdma_conn_upcall(struct rdma_cm_id *id, struct rdma_cm_event *event)
{
@@ -330,6 +261,7 @@ rpcrdma_conn_upcall(struct rdma_cm_id *id, struct rdma_cm_event *event)
connected:
dprintk("RPC: %s: %sconnected\n",
__func__, connstate > 0 ? "" : "dis");
+ atomic_set(&xprt->rx_buf.rb_credits, 1);
ep->rep_connected = connstate;
rpcrdma_conn_func(ep);
wake_up_all(&ep->rep_connect_wait);
@@ -433,23 +365,6 @@ out:
}
/*
- * Drain any cq, prior to teardown.
- */
-static void
-rpcrdma_clean_cq(struct ib_cq *cq)
-{
- struct ib_wc wc;
- int count = 0;
-
- while (1 == ib_poll_cq(cq, 1, &wc))
- ++count;
-
- if (count)
- dprintk("RPC: %s: flushed %d events (last 0x%x)\n",
- __func__, count, wc.opcode);
-}
-
-/*
* Exported functions.
*/
@@ -518,7 +433,6 @@ rpcrdma_ia_open(struct rpcrdma_xprt *xprt, struct sockaddr *addr, int memreg)
dprintk("RPC: %s: memory registration strategy is '%s'\n",
__func__, ia->ri_ops->ro_displayname);
- rwlock_init(&ia->ri_qplock);
return 0;
out3:
@@ -560,9 +474,8 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
struct rpcrdma_create_data_internal *cdata)
{
struct ib_cq *sendcq, *recvcq;
- struct ib_cq_init_attr cq_attr = {};
unsigned int max_qp_wr;
- int rc, err;
+ int rc;
if (ia->ri_device->attrs.max_sge < RPCRDMA_MAX_IOVS) {
dprintk("RPC: %s: insufficient sge's available\n",
@@ -575,7 +488,7 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
__func__);
return -ENOMEM;
}
- max_qp_wr = ia->ri_device->attrs.max_qp_wr - RPCRDMA_BACKWARD_WRS;
+ max_qp_wr = ia->ri_device->attrs.max_qp_wr - RPCRDMA_BACKWARD_WRS - 1;
/* check provider's send/recv wr limits */
if (cdata->max_requests > max_qp_wr)
@@ -586,11 +499,13 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
ep->rep_attr.srq = NULL;
ep->rep_attr.cap.max_send_wr = cdata->max_requests;
ep->rep_attr.cap.max_send_wr += RPCRDMA_BACKWARD_WRS;
+ ep->rep_attr.cap.max_send_wr += 1; /* drain cqe */
rc = ia->ri_ops->ro_open(ia, ep, cdata);
if (rc)
return rc;
ep->rep_attr.cap.max_recv_wr = cdata->max_requests;
ep->rep_attr.cap.max_recv_wr += RPCRDMA_BACKWARD_WRS;
+ ep->rep_attr.cap.max_recv_wr += 1; /* drain cqe */
ep->rep_attr.cap.max_send_sge = RPCRDMA_MAX_IOVS;
ep->rep_attr.cap.max_recv_sge = 1;
ep->rep_attr.cap.max_inline_data = 0;
@@ -614,9 +529,9 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
init_waitqueue_head(&ep->rep_connect_wait);
INIT_DELAYED_WORK(&ep->rep_connect_worker, rpcrdma_connect_worker);
- cq_attr.cqe = ep->rep_attr.cap.max_send_wr + 1;
- sendcq = ib_create_cq(ia->ri_device, rpcrdma_sendcq_upcall,
- rpcrdma_cq_async_error_upcall, NULL, &cq_attr);
+ sendcq = ib_alloc_cq(ia->ri_device, NULL,
+ ep->rep_attr.cap.max_send_wr + 1,
+ 0, IB_POLL_SOFTIRQ);
if (IS_ERR(sendcq)) {
rc = PTR_ERR(sendcq);
dprintk("RPC: %s: failed to create send CQ: %i\n",
@@ -624,16 +539,9 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
goto out1;
}
- rc = ib_req_notify_cq(sendcq, IB_CQ_NEXT_COMP);
- if (rc) {
- dprintk("RPC: %s: ib_req_notify_cq failed: %i\n",
- __func__, rc);
- goto out2;
- }
-
- cq_attr.cqe = ep->rep_attr.cap.max_recv_wr + 1;
- recvcq = ib_create_cq(ia->ri_device, rpcrdma_recvcq_upcall,
- rpcrdma_cq_async_error_upcall, NULL, &cq_attr);
+ recvcq = ib_alloc_cq(ia->ri_device, NULL,
+ ep->rep_attr.cap.max_recv_wr + 1,
+ 0, IB_POLL_SOFTIRQ);
if (IS_ERR(recvcq)) {
rc = PTR_ERR(recvcq);
dprintk("RPC: %s: failed to create recv CQ: %i\n",
@@ -641,18 +549,11 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
goto out2;
}
- rc = ib_req_notify_cq(recvcq, IB_CQ_NEXT_COMP);
- if (rc) {
- dprintk("RPC: %s: ib_req_notify_cq failed: %i\n",
- __func__, rc);
- ib_destroy_cq(recvcq);
- goto out2;
- }
-
ep->rep_attr.send_cq = sendcq;
ep->rep_attr.recv_cq = recvcq;
/* Initialize cma parameters */
+ memset(&ep->rep_remote_cma, 0, sizeof(ep->rep_remote_cma));
/* RPC/RDMA does not use private data */
ep->rep_remote_cma.private_data = NULL;
@@ -666,17 +567,23 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
ep->rep_remote_cma.responder_resources =
ia->ri_device->attrs.max_qp_rd_atom;
- ep->rep_remote_cma.retry_count = 7;
+ /* Limit transport retries so client can detect server
+ * GID changes quickly. RPC layer handles re-establishing
+ * transport connection and retransmission.
+ */
+ ep->rep_remote_cma.retry_count = 6;
+
+ /* RPC-over-RDMA handles its own flow control. In addition,
+ * make all RNR NAKs visible so we know that RPC-over-RDMA
+ * flow control is working correctly (no NAKs should be seen).
+ */
ep->rep_remote_cma.flow_control = 0;
ep->rep_remote_cma.rnr_retry_count = 0;
return 0;
out2:
- err = ib_destroy_cq(sendcq);
- if (err)
- dprintk("RPC: %s: ib_destroy_cq returned %i\n",
- __func__, err);
+ ib_free_cq(sendcq);
out1:
if (ia->ri_dma_mr)
ib_dereg_mr(ia->ri_dma_mr);
@@ -700,26 +607,14 @@ rpcrdma_ep_destroy(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
cancel_delayed_work_sync(&ep->rep_connect_worker);
- if (ia->ri_id->qp)
- rpcrdma_ep_disconnect(ep, ia);
-
- rpcrdma_clean_cq(ep->rep_attr.recv_cq);
- rpcrdma_clean_cq(ep->rep_attr.send_cq);
-
if (ia->ri_id->qp) {
+ rpcrdma_ep_disconnect(ep, ia);
rdma_destroy_qp(ia->ri_id);
ia->ri_id->qp = NULL;
}
- rc = ib_destroy_cq(ep->rep_attr.recv_cq);
- if (rc)
- dprintk("RPC: %s: ib_destroy_cq returned %i\n",
- __func__, rc);
-
- rc = ib_destroy_cq(ep->rep_attr.send_cq);
- if (rc)
- dprintk("RPC: %s: ib_destroy_cq returned %i\n",
- __func__, rc);
+ ib_free_cq(ep->rep_attr.recv_cq);
+ ib_free_cq(ep->rep_attr.send_cq);
if (ia->ri_dma_mr) {
rc = ib_dereg_mr(ia->ri_dma_mr);
@@ -744,7 +639,6 @@ retry:
dprintk("RPC: %s: reconnecting...\n", __func__);
rpcrdma_ep_disconnect(ep, ia);
- rpcrdma_flush_cqs(ep);
xprt = container_of(ia, struct rpcrdma_xprt, rx_ia);
id = rpcrdma_create_id(xprt, ia,
@@ -777,10 +671,8 @@ retry:
goto out;
}
- write_lock(&ia->ri_qplock);
old = ia->ri_id;
ia->ri_id = id;
- write_unlock(&ia->ri_qplock);
rdma_destroy_qp(old);
rpcrdma_destroy_id(old);
@@ -870,7 +762,6 @@ rpcrdma_ep_disconnect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
{
int rc;
- rpcrdma_flush_cqs(ep);
rc = rdma_disconnect(ia->ri_id);
if (!rc) {
/* returns without wait if not connected */
@@ -882,6 +773,8 @@ rpcrdma_ep_disconnect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
dprintk("RPC: %s: rdma_disconnect %i\n", __func__, rc);
ep->rep_connected = rc;
}
+
+ ib_drain_qp(ia->ri_id->qp);
}
struct rpcrdma_req *
@@ -898,6 +791,7 @@ rpcrdma_create_req(struct rpcrdma_xprt *r_xprt)
spin_lock(&buffer->rb_reqslock);
list_add(&req->rl_all, &buffer->rb_allreqs);
spin_unlock(&buffer->rb_reqslock);
+ req->rl_cqe.done = rpcrdma_wc_send;
req->rl_buffer = &r_xprt->rx_buf;
return req;
}
@@ -923,6 +817,7 @@ rpcrdma_create_rep(struct rpcrdma_xprt *r_xprt)
}
rep->rr_device = ia->ri_device;
+ rep->rr_cqe.done = rpcrdma_receive_wc;
rep->rr_rxprt = r_xprt;
INIT_WORK(&rep->rr_work, rpcrdma_receive_worker);
return rep;
@@ -943,6 +838,7 @@ rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
buf->rb_max_requests = r_xprt->rx_data.max_requests;
buf->rb_bc_srv_max_requests = 0;
spin_lock_init(&buf->rb_lock);
+ atomic_set(&buf->rb_credits, 1);
rc = ia->ri_ops->ro_init(r_xprt);
if (rc)
@@ -1259,7 +1155,7 @@ rpcrdma_ep_post(struct rpcrdma_ia *ia,
}
send_wr.next = NULL;
- send_wr.wr_id = RPCRDMA_IGNORE_COMPLETION;
+ send_wr.wr_cqe = &req->rl_cqe;
send_wr.sg_list = iov;
send_wr.num_sge = req->rl_niovs;
send_wr.opcode = IB_WR_SEND;
@@ -1297,7 +1193,7 @@ rpcrdma_ep_post_recv(struct rpcrdma_ia *ia,
int rc;
recv_wr.next = NULL;
- recv_wr.wr_id = (u64) (unsigned long) rep;
+ recv_wr.wr_cqe = &rep->rr_cqe;
recv_wr.sg_list = &rep->rr_rdmabuf->rg_iov;
recv_wr.num_sge = 1;
@@ -1353,25 +1249,3 @@ out_rc:
rpcrdma_recv_buffer_put(rep);
return rc;
}
-
-/* How many chunk list items fit within our inline buffers?
- */
-unsigned int
-rpcrdma_max_segments(struct rpcrdma_xprt *r_xprt)
-{
- struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data;
- int bytes, segments;
-
- bytes = min_t(unsigned int, cdata->inline_wsize, cdata->inline_rsize);
- bytes -= RPCRDMA_HDRLEN_MIN;
- if (bytes < sizeof(struct rpcrdma_segment) * 2) {
- pr_warn("RPC: %s: inline threshold too small\n",
- __func__);
- return 0;
- }
-
- segments = 1 << (fls(bytes / sizeof(struct rpcrdma_segment)) - 1);
- dprintk("RPC: %s: max chunk list size = %d segments\n",
- __func__, segments);
- return segments;
-}
diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h
index 38fe11b09875..95cdc66225ee 100644
--- a/net/sunrpc/xprtrdma/xprt_rdma.h
+++ b/net/sunrpc/xprtrdma/xprt_rdma.h
@@ -65,7 +65,6 @@
*/
struct rpcrdma_ia {
const struct rpcrdma_memreg_ops *ri_ops;
- rwlock_t ri_qplock;
struct ib_device *ri_device;
struct rdma_cm_id *ri_id;
struct ib_pd *ri_pd;
@@ -73,6 +72,8 @@ struct rpcrdma_ia {
struct completion ri_done;
int ri_async_rc;
unsigned int ri_max_frmr_depth;
+ unsigned int ri_max_inline_write;
+ unsigned int ri_max_inline_read;
struct ib_qp_attr ri_qp_attr;
struct ib_qp_init_attr ri_qp_init_attr;
};
@@ -95,10 +96,6 @@ struct rpcrdma_ep {
#define INIT_CQCOUNT(ep) atomic_set(&(ep)->rep_cqcount, (ep)->rep_cqinit)
#define DECR_CQCOUNT(ep) atomic_sub_return(1, &(ep)->rep_cqcount)
-/* Force completion handler to ignore the signal
- */
-#define RPCRDMA_IGNORE_COMPLETION (0ULL)
-
/* Pre-allocate extra Work Requests for handling backward receives
* and sends. This is a fixed value because the Work Queues are
* allocated when the forward channel is set up.
@@ -148,6 +145,26 @@ rdmab_to_msg(struct rpcrdma_regbuf *rb)
#define RPCRDMA_DEF_GFP (GFP_NOIO | __GFP_NOWARN)
+/* To ensure a transport can always make forward progress,
+ * the number of RDMA segments allowed in header chunk lists
+ * is capped at 8. This prevents less-capable devices and
+ * memory registrations from overrunning the Send buffer
+ * while building chunk lists.
+ *
+ * Elements of the Read list take up more room than the
+ * Write list or Reply chunk. 8 read segments means the Read
+ * list (or Write list or Reply chunk) cannot consume more
+ * than
+ *
+ * ((8 + 2) * read segment size) + 1 XDR words, or 244 bytes.
+ *
+ * And the fixed part of the header is another 24 bytes.
+ *
+ * The smallest inline threshold is 1024 bytes, ensuring that
+ * at least 750 bytes are available for RPC messages.
+ */
+#define RPCRDMA_MAX_HDR_SEGS (8)
+
/*
* struct rpcrdma_rep -- this structure encapsulates state required to recv
* and complete a reply, asychronously. It needs several pieces of
@@ -166,11 +183,14 @@ rdmab_to_msg(struct rpcrdma_regbuf *rb)
*/
#define RPCRDMA_MAX_DATA_SEGS ((1 * 1024 * 1024) / PAGE_SIZE)
-#define RPCRDMA_MAX_SEGS (RPCRDMA_MAX_DATA_SEGS + 2) /* head+tail = 2 */
+
+/* data segments + head/tail for Call + head/tail for Reply */
+#define RPCRDMA_MAX_SEGS (RPCRDMA_MAX_DATA_SEGS + 4)
struct rpcrdma_buffer;
struct rpcrdma_rep {
+ struct ib_cqe rr_cqe;
unsigned int rr_len;
struct ib_device *rr_device;
struct rpcrdma_xprt *rr_rxprt;
@@ -201,14 +221,13 @@ enum rpcrdma_frmr_state {
};
struct rpcrdma_frmr {
- struct scatterlist *sg;
- int sg_nents;
+ struct scatterlist *fr_sg;
+ int fr_nents;
+ enum dma_data_direction fr_dir;
struct ib_mr *fr_mr;
+ struct ib_cqe fr_cqe;
enum rpcrdma_frmr_state fr_state;
- struct work_struct fr_work;
- struct rpcrdma_xprt *fr_xprt;
- bool fr_waiter;
- struct completion fr_linv_done;;
+ struct completion fr_linv_done;
union {
struct ib_reg_wr fr_regwr;
struct ib_send_wr fr_invwr;
@@ -224,8 +243,9 @@ struct rpcrdma_mw {
union {
struct rpcrdma_fmr fmr;
struct rpcrdma_frmr frmr;
- } r;
- void (*mw_sendcompletion)(struct ib_wc *);
+ };
+ struct work_struct mw_work;
+ struct rpcrdma_xprt *mw_xprt;
struct list_head mw_list;
struct list_head mw_all;
};
@@ -274,13 +294,16 @@ struct rpcrdma_req {
unsigned int rl_niovs;
unsigned int rl_nchunks;
unsigned int rl_connect_cookie;
+ struct rpc_task *rl_task;
struct rpcrdma_buffer *rl_buffer;
struct rpcrdma_rep *rl_reply;/* holder for reply buffer */
struct ib_sge rl_send_iov[RPCRDMA_MAX_IOVS];
struct rpcrdma_regbuf *rl_rdmabuf;
struct rpcrdma_regbuf *rl_sendbuf;
struct rpcrdma_mr_seg rl_segments[RPCRDMA_MAX_SEGS];
+ struct rpcrdma_mr_seg *rl_nextseg;
+ struct ib_cqe rl_cqe;
struct list_head rl_all;
bool rl_backchannel;
};
@@ -311,6 +334,7 @@ struct rpcrdma_buffer {
struct list_head rb_send_bufs;
struct list_head rb_recv_bufs;
u32 rb_max_requests;
+ atomic_t rb_credits; /* most recent credit grant */
u32 rb_bc_srv_max_requests;
spinlock_t rb_reqslock; /* protect rb_allreqs */
@@ -374,8 +398,8 @@ struct rpcrdma_memreg_ops {
struct rpcrdma_mr_seg *, int, bool);
void (*ro_unmap_sync)(struct rpcrdma_xprt *,
struct rpcrdma_req *);
- int (*ro_unmap)(struct rpcrdma_xprt *,
- struct rpcrdma_mr_seg *);
+ void (*ro_unmap_safe)(struct rpcrdma_xprt *,
+ struct rpcrdma_req *, bool);
int (*ro_open)(struct rpcrdma_ia *,
struct rpcrdma_ep *,
struct rpcrdma_create_data_internal *);
@@ -458,7 +482,6 @@ struct rpcrdma_regbuf *rpcrdma_alloc_regbuf(struct rpcrdma_ia *,
void rpcrdma_free_regbuf(struct rpcrdma_ia *,
struct rpcrdma_regbuf *);
-unsigned int rpcrdma_max_segments(struct rpcrdma_xprt *);
int rpcrdma_ep_post_extra_recv(struct rpcrdma_xprt *, unsigned int);
int frwr_alloc_recovery_wq(void);
@@ -521,6 +544,9 @@ void rpcrdma_reply_handler(struct rpcrdma_rep *);
* RPC/RDMA protocol calls - xprtrdma/rpc_rdma.c
*/
int rpcrdma_marshal_req(struct rpc_rqst *);
+void rpcrdma_set_max_header_sizes(struct rpcrdma_ia *,
+ struct rpcrdma_create_data_internal *,
+ unsigned int);
/* RPC/RDMA module init - xprtrdma/transport.c
*/
@@ -536,6 +562,7 @@ void xprt_rdma_cleanup(void);
#if defined(CONFIG_SUNRPC_BACKCHANNEL)
int xprt_rdma_bc_setup(struct rpc_xprt *, unsigned int);
int xprt_rdma_bc_up(struct svc_serv *, struct net *);
+size_t xprt_rdma_bc_maxpayload(struct rpc_xprt *);
int rpcrdma_bc_post_recv(struct rpcrdma_xprt *, unsigned int);
void rpcrdma_bc_receive_call(struct rpcrdma_xprt *, struct rpcrdma_rep *);
int rpcrdma_bc_marshal_reply(struct rpc_rqst *);
diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c
index fde2138b81e7..7e2b2fa189c3 100644
--- a/net/sunrpc/xprtsock.c
+++ b/net/sunrpc/xprtsock.c
@@ -995,15 +995,14 @@ static void xs_udp_data_read_skb(struct rpc_xprt *xprt,
u32 _xid;
__be32 *xp;
- repsize = skb->len - sizeof(struct udphdr);
+ repsize = skb->len;
if (repsize < 4) {
dprintk("RPC: impossible RPC reply size %d!\n", repsize);
return;
}
/* Copy the XID from the skb... */
- xp = skb_header_pointer(skb, sizeof(struct udphdr),
- sizeof(_xid), &_xid);
+ xp = skb_header_pointer(skb, 0, sizeof(_xid), &_xid);
if (xp == NULL)
return;
@@ -1019,11 +1018,11 @@ static void xs_udp_data_read_skb(struct rpc_xprt *xprt,
/* Suck it into the iovec, verify checksum if not done by hw. */
if (csum_partial_copy_to_xdr(&rovr->rq_private_buf, skb)) {
- UDPX_INC_STATS_BH(sk, UDP_MIB_INERRORS);
+ __UDPX_INC_STATS(sk, UDP_MIB_INERRORS);
goto out_unlock;
}
- UDPX_INC_STATS_BH(sk, UDP_MIB_INDATAGRAMS);
+ __UDPX_INC_STATS(sk, UDP_MIB_INDATAGRAMS);
xprt_adjust_cwnd(xprt, task, copied);
xprt_complete_rqst(task, copied);
@@ -1365,6 +1364,11 @@ static int xs_tcp_bc_up(struct svc_serv *serv, struct net *net)
return ret;
return 0;
}
+
+static size_t xs_tcp_bc_maxpayload(struct rpc_xprt *xprt)
+{
+ return PAGE_SIZE;
+}
#else
static inline int _xs_tcp_read_data(struct rpc_xprt *xprt,
struct xdr_skb_reader *desc)
@@ -1844,9 +1848,7 @@ static int xs_bind(struct sock_xprt *transport, struct socket *sock)
*/
static void xs_local_rpcbind(struct rpc_task *task)
{
- rcu_read_lock();
- xprt_set_bound(rcu_dereference(task->tk_client->cl_xprt));
- rcu_read_unlock();
+ xprt_set_bound(task->tk_xprt);
}
static void xs_local_set_port(struct rpc_xprt *xprt, unsigned short port)
@@ -1883,8 +1885,7 @@ static inline void xs_reclassify_socket6(struct socket *sock)
static inline void xs_reclassify_socket(int family, struct socket *sock)
{
- WARN_ON_ONCE(sock_owned_by_user(sock->sk));
- if (sock_owned_by_user(sock->sk))
+ if (WARN_ON_ONCE(!sock_allow_reclassification(sock->sk)))
return;
switch (family) {
@@ -1954,6 +1955,7 @@ static int xs_local_finish_connecting(struct rpc_xprt *xprt,
sk->sk_user_data = xprt;
sk->sk_data_ready = xs_data_ready;
sk->sk_write_space = xs_udp_write_space;
+ sock_set_flag(sk, SOCK_FASYNC);
sk->sk_error_report = xs_error_report;
sk->sk_allocation = GFP_NOIO;
@@ -2140,6 +2142,7 @@ static void xs_udp_finish_connecting(struct rpc_xprt *xprt, struct socket *sock)
sk->sk_user_data = xprt;
sk->sk_data_ready = xs_data_ready;
sk->sk_write_space = xs_udp_write_space;
+ sock_set_flag(sk, SOCK_FASYNC);
sk->sk_allocation = GFP_NOIO;
xprt_set_connected(xprt);
@@ -2241,6 +2244,7 @@ static int xs_tcp_finish_connecting(struct rpc_xprt *xprt, struct socket *sock)
sk->sk_data_ready = xs_tcp_data_ready;
sk->sk_state_change = xs_tcp_state_change;
sk->sk_write_space = xs_tcp_write_space;
+ sock_set_flag(sk, SOCK_FASYNC);
sk->sk_error_report = xs_error_report;
sk->sk_allocation = GFP_NOIO;
@@ -2662,6 +2666,7 @@ static struct rpc_xprt_ops xs_tcp_ops = {
#ifdef CONFIG_SUNRPC_BACKCHANNEL
.bc_setup = xprt_setup_bc,
.bc_up = xs_tcp_bc_up,
+ .bc_maxpayload = xs_tcp_bc_maxpayload,
.bc_free_rqst = xprt_free_bc_rqst,
.bc_destroy = xprt_destroy_bc,
#endif
@@ -3052,6 +3057,7 @@ static struct rpc_xprt *xs_setup_bc_tcp(struct xprt_create *args)
return xprt;
args->bc_xprt->xpt_bc_xprt = NULL;
+ args->bc_xprt->xpt_bc_xps = NULL;
xprt_put(xprt);
ret = ERR_PTR(-EINVAL);
out_err: