summaryrefslogtreecommitdiff
path: root/libbcachefs/journal.c
diff options
context:
space:
mode:
Diffstat (limited to 'libbcachefs/journal.c')
-rw-r--r--libbcachefs/journal.c209
1 files changed, 132 insertions, 77 deletions
diff --git a/libbcachefs/journal.c b/libbcachefs/journal.c
index 1952643f..1870534d 100644
--- a/libbcachefs/journal.c
+++ b/libbcachefs/journal.c
@@ -31,6 +31,12 @@ static void journal_pin_add_entry(struct journal *,
struct journal_entry_pin *,
journal_pin_flush_fn);
+static inline void journal_wake(struct journal *j)
+{
+ wake_up(&j->wait);
+ closure_wake_up(&j->async_wait);
+}
+
static inline struct journal_buf *journal_cur_buf(struct journal *j)
{
return j->buf + j->reservations.idx;
@@ -43,15 +49,27 @@ static inline struct journal_buf *journal_prev_buf(struct journal *j)
/* Sequence number of oldest dirty journal entry */
-static inline u64 last_seq(struct journal *j)
+static inline u64 journal_last_seq(struct journal *j)
{
- return atomic64_read(&j->seq) - fifo_used(&j->pin) + 1;
+ return j->pin.front;
}
static inline u64 journal_pin_seq(struct journal *j,
struct journal_entry_pin_list *pin_list)
{
- return last_seq(j) + fifo_entry_idx(&j->pin, pin_list);
+ return fifo_entry_idx_abs(&j->pin, pin_list);
+}
+
+u64 bch2_journal_pin_seq(struct journal *j, struct journal_entry_pin *pin)
+{
+ u64 ret = 0;
+
+ spin_lock(&j->lock);
+ if (journal_pin_active(pin))
+ ret = journal_pin_seq(j, pin->pin_list);
+ spin_unlock(&j->lock);
+
+ return ret;
}
static inline void bch2_journal_add_entry_noreservation(struct journal_buf *buf,
@@ -678,7 +696,7 @@ reread: sectors_read = min_t(unsigned,
end - offset, buf->size >> 9);
bio_reset(bio);
- bio->bi_bdev = ca->disk_sb.bdev;
+ bio_set_dev(bio, ca->disk_sb.bdev);
bio->bi_iter.bi_sector = offset;
bio->bi_iter.bi_size = sectors_read << 9;
bio_set_op_attrs(bio, REQ_OP_READ, 0);
@@ -968,7 +986,7 @@ int bch2_journal_read(struct bch_fs *c, struct list_head *list)
struct journal_replay *i;
struct journal_entry_pin_list *p;
struct bch_dev *ca;
- u64 cur_seq, end_seq;
+ u64 cur_seq, end_seq, seq;
unsigned iter, keys = 0, entries = 0;
size_t nr;
int ret = 0;
@@ -1035,11 +1053,7 @@ int bch2_journal_read(struct bch_fs *c, struct list_head *list)
j->pin.front = le64_to_cpu(i->j.last_seq);
j->pin.back = le64_to_cpu(i->j.seq) + 1;
- BUG_ON(last_seq(j) != le64_to_cpu(i->j.last_seq));
- BUG_ON(journal_seq_pin(j, atomic64_read(&j->seq)) !=
- &fifo_peek_back(&j->pin));
-
- fifo_for_each_entry_ptr(p, &j->pin, iter) {
+ fifo_for_each_entry_ptr(p, &j->pin, seq) {
INIT_LIST_HEAD(&p->list);
INIT_LIST_HEAD(&p->flushed);
atomic_set(&p->count, 0);
@@ -1062,7 +1076,7 @@ int bch2_journal_read(struct bch_fs *c, struct list_head *list)
mutex_unlock(&j->blacklist_lock);
- cur_seq = last_seq(j);
+ cur_seq = journal_last_seq(j);
end_seq = le64_to_cpu(list_last_entry(list,
struct journal_replay, list)->j.seq);
@@ -1087,7 +1101,7 @@ int bch2_journal_read(struct bch_fs *c, struct list_head *list)
fsck_err_on(le64_to_cpu(i->j.seq) != cur_seq, c,
"journal entries %llu-%llu missing! (replaying %llu-%llu)",
cur_seq, le64_to_cpu(i->j.seq) - 1,
- last_seq(j), end_seq);
+ journal_last_seq(j), end_seq);
cur_seq = le64_to_cpu(i->j.seq) + 1;
@@ -1155,10 +1169,10 @@ static void journal_pin_new_entry(struct journal *j, int count)
/*
* The fifo_push() needs to happen at the same time as j->seq is
- * incremented for last_seq() to be calculated correctly
+ * incremented for journal_last_seq() to be calculated correctly
*/
- p = fifo_push_ref(&j->pin);
atomic64_inc(&j->seq);
+ p = fifo_push_ref(&j->pin);
EBUG_ON(journal_seq_pin(j, atomic64_read(&j->seq)) !=
&fifo_peek_back(&j->pin));
@@ -1237,7 +1251,7 @@ static enum {
journal_reclaim_fast(j);
/* XXX: why set this here, and not in journal_write()? */
- buf->data->last_seq = cpu_to_le64(last_seq(j));
+ buf->data->last_seq = cpu_to_le64(journal_last_seq(j));
journal_pin_new_entry(j, 1);
@@ -1272,7 +1286,7 @@ void bch2_journal_halt(struct journal *j)
} while ((v = atomic64_cmpxchg(&j->reservations.counter,
old.v, new.v)) != old.v);
- wake_up(&j->wait);
+ journal_wake(j);
closure_wake_up(&journal_cur_buf(j)->wait);
closure_wake_up(&journal_prev_buf(j)->wait);
}
@@ -1302,7 +1316,7 @@ static unsigned journal_dev_buckets_available(struct journal *j,
* Don't use the last bucket unless writing the new last_seq
* will make another bucket available:
*/
- if (ja->bucket_seq[ja->last_idx] >= last_seq(j))
+ if (ja->bucket_seq[ja->last_idx] >= journal_last_seq(j))
available = max((int) available - 1, 0);
return available;
@@ -1441,7 +1455,7 @@ static int journal_entry_open(struct journal *j)
mod_delayed_work(system_freezable_wq,
&j->write_work,
msecs_to_jiffies(j->write_delay_ms));
- wake_up(&j->wait);
+ journal_wake(j);
return 1;
}
@@ -1537,7 +1551,7 @@ int bch2_journal_replay(struct bch_fs *c, struct list_head *list)
}
if (atomic_dec_and_test(&j->replay_pin_list->count))
- wake_up(&j->wait);
+ journal_wake(j);
}
j->replay_pin_list = NULL;
@@ -1704,7 +1718,7 @@ static void journal_reclaim_fast(struct journal *j)
}
if (popped)
- wake_up(&j->wait);
+ journal_wake(j);
}
/*
@@ -1718,6 +1732,7 @@ static inline void __journal_pin_add(struct journal *j,
journal_pin_flush_fn flush_fn)
{
BUG_ON(journal_pin_active(pin));
+ BUG_ON(!atomic_read(&pin_list->count));
atomic_inc(&pin_list->count);
pin->pin_list = pin_list;
@@ -1727,7 +1742,12 @@ static inline void __journal_pin_add(struct journal *j,
list_add(&pin->list, &pin_list->list);
else
INIT_LIST_HEAD(&pin->list);
- wake_up(&j->wait);
+
+ /*
+ * If the journal is currently full, we might want to call flush_fn
+ * immediately:
+ */
+ journal_wake(j);
}
static void journal_pin_add_entry(struct journal *j,
@@ -1754,38 +1774,32 @@ void bch2_journal_pin_add(struct journal *j,
spin_unlock(&j->lock);
}
-static inline bool __journal_pin_drop(struct journal *j,
+static inline void __journal_pin_drop(struct journal *j,
struct journal_entry_pin *pin)
{
struct journal_entry_pin_list *pin_list = pin->pin_list;
- pin->pin_list = NULL;
+ if (!journal_pin_active(pin))
+ return;
- /* journal_reclaim_work() might have already taken us off the list */
- if (!list_empty_careful(&pin->list))
- list_del_init(&pin->list);
+ pin->pin_list = NULL;
+ list_del_init(&pin->list);
- return atomic_dec_and_test(&pin_list->count);
+ /*
+ * Unpinning a journal entry make make journal_next_bucket() succeed, if
+ * writing a new last_seq will now make another bucket available:
+ */
+ if (atomic_dec_and_test(&pin_list->count) &&
+ pin_list == &fifo_peek_front(&j->pin))
+ journal_reclaim_fast(j);
}
void bch2_journal_pin_drop(struct journal *j,
struct journal_entry_pin *pin)
{
- bool wakeup = false;
-
spin_lock(&j->lock);
- if (journal_pin_active(pin))
- wakeup = __journal_pin_drop(j, pin);
+ __journal_pin_drop(j, pin);
spin_unlock(&j->lock);
-
- /*
- * Unpinning a journal entry make make journal_next_bucket() succeed, if
- * writing a new last_seq will now make another bucket available:
- *
- * Nested irqsave is expensive, don't do the wakeup with lock held:
- */
- if (wakeup)
- wake_up(&j->wait);
}
void bch2_journal_pin_add_if_older(struct journal *j,
@@ -1797,10 +1811,9 @@ void bch2_journal_pin_add_if_older(struct journal *j,
if (journal_pin_active(src_pin) &&
(!journal_pin_active(pin) ||
- fifo_entry_idx(&j->pin, src_pin->pin_list) <
- fifo_entry_idx(&j->pin, pin->pin_list))) {
- if (journal_pin_active(pin))
- __journal_pin_drop(j, pin);
+ journal_pin_seq(j, src_pin->pin_list) <
+ journal_pin_seq(j, pin->pin_list))) {
+ __journal_pin_drop(j, pin);
__journal_pin_add(j, src_pin->pin_list, pin, flush_fn);
}
@@ -1812,13 +1825,13 @@ __journal_get_next_pin(struct journal *j, u64 seq_to_flush, u64 *seq)
{
struct journal_entry_pin_list *pin_list;
struct journal_entry_pin *ret;
- unsigned iter;
+ u64 iter;
/* no need to iterate over empty fifo entries: */
journal_reclaim_fast(j);
fifo_for_each_entry_ptr(pin_list, &j->pin, iter) {
- if (journal_pin_seq(j, pin_list) > seq_to_flush)
+ if (iter > seq_to_flush)
break;
ret = list_first_entry_or_null(&pin_list->list,
@@ -1826,7 +1839,7 @@ __journal_get_next_pin(struct journal *j, u64 seq_to_flush, u64 *seq)
if (ret) {
/* must be list_del_init(), see bch2_journal_pin_drop() */
list_move(&ret->list, &pin_list->flushed);
- *seq = journal_pin_seq(j, pin_list);
+ *seq = iter;
return ret;
}
}
@@ -1863,9 +1876,9 @@ static int journal_flush_done(struct journal *j, u64 seq_to_flush,
* If journal replay hasn't completed, the unreplayed journal entries
* hold refs on their corresponding sequence numbers
*/
- ret = (*pin = __journal_get_next_pin(j, seq_to_flush, pin_seq)) ||
+ ret = (*pin = __journal_get_next_pin(j, seq_to_flush, pin_seq)) != NULL ||
!test_bit(JOURNAL_REPLAY_DONE, &j->flags) ||
- last_seq(j) > seq_to_flush ||
+ journal_last_seq(j) > seq_to_flush ||
(fifo_used(&j->pin) == 1 &&
atomic_read(&fifo_peek_front(&j->pin).count) == 1);
spin_unlock(&j->lock);
@@ -1891,7 +1904,7 @@ again:
}
spin_lock(&j->lock);
- flush = last_seq(j) != j->last_seq_ondisk ||
+ flush = journal_last_seq(j) != j->last_seq_ondisk ||
(seq_to_flush == U64_MAX && c->btree_roots_dirty);
spin_unlock(&j->lock);
@@ -1984,7 +1997,7 @@ static void journal_reclaim_work(struct work_struct *work)
ja->last_idx = (ja->last_idx + 1) % ja->nr;
spin_unlock(&j->lock);
- wake_up(&j->wait);
+ journal_wake(j);
}
/*
@@ -2224,7 +2237,7 @@ out:
&j->reservations.counter);
closure_wake_up(&w->wait);
- wake_up(&j->wait);
+ journal_wake(j);
if (test_bit(JOURNAL_NEED_WRITE, &j->flags))
mod_delayed_work(system_freezable_wq, &j->write_work, 0);
@@ -2338,8 +2351,8 @@ static void journal_write(struct closure *cl)
bio = ca->journal.bio;
bio_reset(bio);
+ bio_set_dev(bio, ca->disk_sb.bdev);
bio->bi_iter.bi_sector = ptr->offset;
- bio->bi_bdev = ca->disk_sb.bdev;
bio->bi_iter.bi_size = sectors << 9;
bio->bi_end_io = journal_write_endio;
bio->bi_private = ca;
@@ -2360,7 +2373,7 @@ static void journal_write(struct closure *cl)
bio = ca->journal.bio;
bio_reset(bio);
- bio->bi_bdev = ca->disk_sb.bdev;
+ bio_set_dev(bio, ca->disk_sb.bdev);
bio->bi_opf = REQ_OP_FLUSH;
bio->bi_end_io = journal_write_endio;
bio->bi_private = ca;
@@ -2377,19 +2390,34 @@ err:
continue_at(cl, journal_write_done, system_highpri_wq);
}
-static void journal_write_work(struct work_struct *work)
+/*
+ * returns true if there's nothing to flush and no journal write still in flight
+ */
+static bool journal_flush_write(struct journal *j)
{
- struct journal *j = container_of(to_delayed_work(work),
- struct journal, write_work);
+ bool ret;
+
spin_lock(&j->lock);
+ ret = !j->reservations.prev_buf_unwritten;
+
if (!journal_entry_is_open(j)) {
spin_unlock(&j->lock);
- return;
+ return ret;
}
set_bit(JOURNAL_NEED_WRITE, &j->flags);
- if (journal_buf_switch(j, false) != JOURNAL_UNLOCKED)
+ if (journal_buf_switch(j, false) == JOURNAL_UNLOCKED)
+ ret = false;
+ else
spin_unlock(&j->lock);
+ return ret;
+}
+
+static void journal_write_work(struct work_struct *work)
+{
+ struct journal *j = container_of(work, struct journal, write_work.work);
+
+ journal_flush_write(j);
}
/*
@@ -2514,6 +2542,43 @@ int bch2_journal_res_get_slowpath(struct journal *j, struct journal_res *res,
return ret < 0 ? ret : 0;
}
+u64 bch2_journal_last_unwritten_seq(struct journal *j)
+{
+ u64 seq;
+
+ spin_lock(&j->lock);
+ seq = atomic64_read(&j->seq);
+ if (j->reservations.prev_buf_unwritten)
+ seq--;
+ spin_unlock(&j->lock);
+
+ return seq;
+}
+
+int bch2_journal_open_seq_async(struct journal *j, u64 seq, struct closure *parent)
+{
+ int ret;
+
+ spin_lock(&j->lock);
+ BUG_ON(seq > atomic64_read(&j->seq));
+
+ if (seq < atomic64_read(&j->seq) ||
+ journal_entry_is_open(j)) {
+ spin_unlock(&j->lock);
+ return 1;
+ }
+
+ ret = journal_entry_open(j);
+ if (!ret)
+ closure_wait(&j->async_wait, parent);
+ spin_unlock(&j->lock);
+
+ if (!ret)
+ journal_reclaim_work(&j->reclaim_work.work);
+
+ return ret;
+}
+
void bch2_journal_wait_on_seq(struct journal *j, u64 seq, struct closure *parent)
{
spin_lock(&j->lock);
@@ -2737,14 +2802,13 @@ int bch2_journal_flush_device(struct journal *j, unsigned dev_idx)
struct bch_fs *c = container_of(j, struct bch_fs, journal);
struct journal_entry_pin_list *p;
struct bch_devs_list devs;
- u64 seq = 0;
- unsigned iter;
+ u64 iter, seq = 0;
int ret = 0;
spin_lock(&j->lock);
fifo_for_each_entry_ptr(p, &j->pin, iter)
if (bch2_dev_list_has_dev(p->devs, dev_idx))
- seq = journal_pin_seq(j, p);
+ seq = iter;
spin_unlock(&j->lock);
ret = bch2_journal_flush_pins(j, seq);
@@ -2758,7 +2822,7 @@ int bch2_journal_flush_device(struct journal *j, unsigned dev_idx)
spin_lock(&j->lock);
while (!ret && seq < atomic64_read(&j->seq)) {
- seq = max(seq, last_seq(j));
+ seq = max(seq, journal_last_seq(j));
devs = journal_seq_pin(j, seq)->devs;
seq++;
@@ -2804,15 +2868,7 @@ void bch2_dev_journal_stop(struct journal *j, struct bch_dev *ca)
void bch2_fs_journal_stop(struct journal *j)
{
- if (!test_bit(JOURNAL_STARTED, &j->flags))
- return;
-
- /*
- * Empty out the journal by first flushing everything pinning existing
- * journal entries, then force a brand new empty journal entry to be
- * written:
- */
- bch2_journal_flush_all_pins(j);
+ wait_event(j->wait, journal_flush_write(j));
cancel_delayed_work_sync(&j->write_work);
cancel_delayed_work_sync(&j->reclaim_work);
@@ -2914,7 +2970,7 @@ ssize_t bch2_journal_print_debug(struct journal *j, char *buf)
spin_lock(&j->lock);
ret += scnprintf(buf + ret, PAGE_SIZE - ret,
- "active journal entries:\t%zu\n"
+ "active journal entries:\t%llu\n"
"seq:\t\t\t%llu\n"
"last_seq:\t\t%llu\n"
"last_seq_ondisk:\t%llu\n"
@@ -2927,7 +2983,7 @@ ssize_t bch2_journal_print_debug(struct journal *j, char *buf)
"replay done:\t\t%i\n",
fifo_used(&j->pin),
(u64) atomic64_read(&j->seq),
- last_seq(j),
+ journal_last_seq(j),
j->last_seq_ondisk,
journal_state_count(*s, s->idx),
s->cur_entry_offset,
@@ -2965,14 +3021,13 @@ ssize_t bch2_journal_print_pins(struct journal *j, char *buf)
struct journal_entry_pin_list *pin_list;
struct journal_entry_pin *pin;
ssize_t ret = 0;
- unsigned i;
+ u64 i;
spin_lock(&j->lock);
fifo_for_each_entry_ptr(pin_list, &j->pin, i) {
ret += scnprintf(buf + ret, PAGE_SIZE - ret,
"%llu: count %u\n",
- journal_pin_seq(j, pin_list),
- atomic_read(&pin_list->count));
+ i, atomic_read(&pin_list->count));
list_for_each_entry(pin, &pin_list->list, list)
ret += scnprintf(buf + ret, PAGE_SIZE - ret,