diff options
Diffstat (limited to 'libbcachefs/journal.c')
-rw-r--r-- | libbcachefs/journal.c | 209 |
1 files changed, 132 insertions, 77 deletions
diff --git a/libbcachefs/journal.c b/libbcachefs/journal.c index 1952643f..1870534d 100644 --- a/libbcachefs/journal.c +++ b/libbcachefs/journal.c @@ -31,6 +31,12 @@ static void journal_pin_add_entry(struct journal *, struct journal_entry_pin *, journal_pin_flush_fn); +static inline void journal_wake(struct journal *j) +{ + wake_up(&j->wait); + closure_wake_up(&j->async_wait); +} + static inline struct journal_buf *journal_cur_buf(struct journal *j) { return j->buf + j->reservations.idx; @@ -43,15 +49,27 @@ static inline struct journal_buf *journal_prev_buf(struct journal *j) /* Sequence number of oldest dirty journal entry */ -static inline u64 last_seq(struct journal *j) +static inline u64 journal_last_seq(struct journal *j) { - return atomic64_read(&j->seq) - fifo_used(&j->pin) + 1; + return j->pin.front; } static inline u64 journal_pin_seq(struct journal *j, struct journal_entry_pin_list *pin_list) { - return last_seq(j) + fifo_entry_idx(&j->pin, pin_list); + return fifo_entry_idx_abs(&j->pin, pin_list); +} + +u64 bch2_journal_pin_seq(struct journal *j, struct journal_entry_pin *pin) +{ + u64 ret = 0; + + spin_lock(&j->lock); + if (journal_pin_active(pin)) + ret = journal_pin_seq(j, pin->pin_list); + spin_unlock(&j->lock); + + return ret; } static inline void bch2_journal_add_entry_noreservation(struct journal_buf *buf, @@ -678,7 +696,7 @@ reread: sectors_read = min_t(unsigned, end - offset, buf->size >> 9); bio_reset(bio); - bio->bi_bdev = ca->disk_sb.bdev; + bio_set_dev(bio, ca->disk_sb.bdev); bio->bi_iter.bi_sector = offset; bio->bi_iter.bi_size = sectors_read << 9; bio_set_op_attrs(bio, REQ_OP_READ, 0); @@ -968,7 +986,7 @@ int bch2_journal_read(struct bch_fs *c, struct list_head *list) struct journal_replay *i; struct journal_entry_pin_list *p; struct bch_dev *ca; - u64 cur_seq, end_seq; + u64 cur_seq, end_seq, seq; unsigned iter, keys = 0, entries = 0; size_t nr; int ret = 0; @@ -1035,11 +1053,7 @@ int bch2_journal_read(struct bch_fs *c, struct list_head *list) j->pin.front = le64_to_cpu(i->j.last_seq); j->pin.back = le64_to_cpu(i->j.seq) + 1; - BUG_ON(last_seq(j) != le64_to_cpu(i->j.last_seq)); - BUG_ON(journal_seq_pin(j, atomic64_read(&j->seq)) != - &fifo_peek_back(&j->pin)); - - fifo_for_each_entry_ptr(p, &j->pin, iter) { + fifo_for_each_entry_ptr(p, &j->pin, seq) { INIT_LIST_HEAD(&p->list); INIT_LIST_HEAD(&p->flushed); atomic_set(&p->count, 0); @@ -1062,7 +1076,7 @@ int bch2_journal_read(struct bch_fs *c, struct list_head *list) mutex_unlock(&j->blacklist_lock); - cur_seq = last_seq(j); + cur_seq = journal_last_seq(j); end_seq = le64_to_cpu(list_last_entry(list, struct journal_replay, list)->j.seq); @@ -1087,7 +1101,7 @@ int bch2_journal_read(struct bch_fs *c, struct list_head *list) fsck_err_on(le64_to_cpu(i->j.seq) != cur_seq, c, "journal entries %llu-%llu missing! (replaying %llu-%llu)", cur_seq, le64_to_cpu(i->j.seq) - 1, - last_seq(j), end_seq); + journal_last_seq(j), end_seq); cur_seq = le64_to_cpu(i->j.seq) + 1; @@ -1155,10 +1169,10 @@ static void journal_pin_new_entry(struct journal *j, int count) /* * The fifo_push() needs to happen at the same time as j->seq is - * incremented for last_seq() to be calculated correctly + * incremented for journal_last_seq() to be calculated correctly */ - p = fifo_push_ref(&j->pin); atomic64_inc(&j->seq); + p = fifo_push_ref(&j->pin); EBUG_ON(journal_seq_pin(j, atomic64_read(&j->seq)) != &fifo_peek_back(&j->pin)); @@ -1237,7 +1251,7 @@ static enum { journal_reclaim_fast(j); /* XXX: why set this here, and not in journal_write()? */ - buf->data->last_seq = cpu_to_le64(last_seq(j)); + buf->data->last_seq = cpu_to_le64(journal_last_seq(j)); journal_pin_new_entry(j, 1); @@ -1272,7 +1286,7 @@ void bch2_journal_halt(struct journal *j) } while ((v = atomic64_cmpxchg(&j->reservations.counter, old.v, new.v)) != old.v); - wake_up(&j->wait); + journal_wake(j); closure_wake_up(&journal_cur_buf(j)->wait); closure_wake_up(&journal_prev_buf(j)->wait); } @@ -1302,7 +1316,7 @@ static unsigned journal_dev_buckets_available(struct journal *j, * Don't use the last bucket unless writing the new last_seq * will make another bucket available: */ - if (ja->bucket_seq[ja->last_idx] >= last_seq(j)) + if (ja->bucket_seq[ja->last_idx] >= journal_last_seq(j)) available = max((int) available - 1, 0); return available; @@ -1441,7 +1455,7 @@ static int journal_entry_open(struct journal *j) mod_delayed_work(system_freezable_wq, &j->write_work, msecs_to_jiffies(j->write_delay_ms)); - wake_up(&j->wait); + journal_wake(j); return 1; } @@ -1537,7 +1551,7 @@ int bch2_journal_replay(struct bch_fs *c, struct list_head *list) } if (atomic_dec_and_test(&j->replay_pin_list->count)) - wake_up(&j->wait); + journal_wake(j); } j->replay_pin_list = NULL; @@ -1704,7 +1718,7 @@ static void journal_reclaim_fast(struct journal *j) } if (popped) - wake_up(&j->wait); + journal_wake(j); } /* @@ -1718,6 +1732,7 @@ static inline void __journal_pin_add(struct journal *j, journal_pin_flush_fn flush_fn) { BUG_ON(journal_pin_active(pin)); + BUG_ON(!atomic_read(&pin_list->count)); atomic_inc(&pin_list->count); pin->pin_list = pin_list; @@ -1727,7 +1742,12 @@ static inline void __journal_pin_add(struct journal *j, list_add(&pin->list, &pin_list->list); else INIT_LIST_HEAD(&pin->list); - wake_up(&j->wait); + + /* + * If the journal is currently full, we might want to call flush_fn + * immediately: + */ + journal_wake(j); } static void journal_pin_add_entry(struct journal *j, @@ -1754,38 +1774,32 @@ void bch2_journal_pin_add(struct journal *j, spin_unlock(&j->lock); } -static inline bool __journal_pin_drop(struct journal *j, +static inline void __journal_pin_drop(struct journal *j, struct journal_entry_pin *pin) { struct journal_entry_pin_list *pin_list = pin->pin_list; - pin->pin_list = NULL; + if (!journal_pin_active(pin)) + return; - /* journal_reclaim_work() might have already taken us off the list */ - if (!list_empty_careful(&pin->list)) - list_del_init(&pin->list); + pin->pin_list = NULL; + list_del_init(&pin->list); - return atomic_dec_and_test(&pin_list->count); + /* + * Unpinning a journal entry make make journal_next_bucket() succeed, if + * writing a new last_seq will now make another bucket available: + */ + if (atomic_dec_and_test(&pin_list->count) && + pin_list == &fifo_peek_front(&j->pin)) + journal_reclaim_fast(j); } void bch2_journal_pin_drop(struct journal *j, struct journal_entry_pin *pin) { - bool wakeup = false; - spin_lock(&j->lock); - if (journal_pin_active(pin)) - wakeup = __journal_pin_drop(j, pin); + __journal_pin_drop(j, pin); spin_unlock(&j->lock); - - /* - * Unpinning a journal entry make make journal_next_bucket() succeed, if - * writing a new last_seq will now make another bucket available: - * - * Nested irqsave is expensive, don't do the wakeup with lock held: - */ - if (wakeup) - wake_up(&j->wait); } void bch2_journal_pin_add_if_older(struct journal *j, @@ -1797,10 +1811,9 @@ void bch2_journal_pin_add_if_older(struct journal *j, if (journal_pin_active(src_pin) && (!journal_pin_active(pin) || - fifo_entry_idx(&j->pin, src_pin->pin_list) < - fifo_entry_idx(&j->pin, pin->pin_list))) { - if (journal_pin_active(pin)) - __journal_pin_drop(j, pin); + journal_pin_seq(j, src_pin->pin_list) < + journal_pin_seq(j, pin->pin_list))) { + __journal_pin_drop(j, pin); __journal_pin_add(j, src_pin->pin_list, pin, flush_fn); } @@ -1812,13 +1825,13 @@ __journal_get_next_pin(struct journal *j, u64 seq_to_flush, u64 *seq) { struct journal_entry_pin_list *pin_list; struct journal_entry_pin *ret; - unsigned iter; + u64 iter; /* no need to iterate over empty fifo entries: */ journal_reclaim_fast(j); fifo_for_each_entry_ptr(pin_list, &j->pin, iter) { - if (journal_pin_seq(j, pin_list) > seq_to_flush) + if (iter > seq_to_flush) break; ret = list_first_entry_or_null(&pin_list->list, @@ -1826,7 +1839,7 @@ __journal_get_next_pin(struct journal *j, u64 seq_to_flush, u64 *seq) if (ret) { /* must be list_del_init(), see bch2_journal_pin_drop() */ list_move(&ret->list, &pin_list->flushed); - *seq = journal_pin_seq(j, pin_list); + *seq = iter; return ret; } } @@ -1863,9 +1876,9 @@ static int journal_flush_done(struct journal *j, u64 seq_to_flush, * If journal replay hasn't completed, the unreplayed journal entries * hold refs on their corresponding sequence numbers */ - ret = (*pin = __journal_get_next_pin(j, seq_to_flush, pin_seq)) || + ret = (*pin = __journal_get_next_pin(j, seq_to_flush, pin_seq)) != NULL || !test_bit(JOURNAL_REPLAY_DONE, &j->flags) || - last_seq(j) > seq_to_flush || + journal_last_seq(j) > seq_to_flush || (fifo_used(&j->pin) == 1 && atomic_read(&fifo_peek_front(&j->pin).count) == 1); spin_unlock(&j->lock); @@ -1891,7 +1904,7 @@ again: } spin_lock(&j->lock); - flush = last_seq(j) != j->last_seq_ondisk || + flush = journal_last_seq(j) != j->last_seq_ondisk || (seq_to_flush == U64_MAX && c->btree_roots_dirty); spin_unlock(&j->lock); @@ -1984,7 +1997,7 @@ static void journal_reclaim_work(struct work_struct *work) ja->last_idx = (ja->last_idx + 1) % ja->nr; spin_unlock(&j->lock); - wake_up(&j->wait); + journal_wake(j); } /* @@ -2224,7 +2237,7 @@ out: &j->reservations.counter); closure_wake_up(&w->wait); - wake_up(&j->wait); + journal_wake(j); if (test_bit(JOURNAL_NEED_WRITE, &j->flags)) mod_delayed_work(system_freezable_wq, &j->write_work, 0); @@ -2338,8 +2351,8 @@ static void journal_write(struct closure *cl) bio = ca->journal.bio; bio_reset(bio); + bio_set_dev(bio, ca->disk_sb.bdev); bio->bi_iter.bi_sector = ptr->offset; - bio->bi_bdev = ca->disk_sb.bdev; bio->bi_iter.bi_size = sectors << 9; bio->bi_end_io = journal_write_endio; bio->bi_private = ca; @@ -2360,7 +2373,7 @@ static void journal_write(struct closure *cl) bio = ca->journal.bio; bio_reset(bio); - bio->bi_bdev = ca->disk_sb.bdev; + bio_set_dev(bio, ca->disk_sb.bdev); bio->bi_opf = REQ_OP_FLUSH; bio->bi_end_io = journal_write_endio; bio->bi_private = ca; @@ -2377,19 +2390,34 @@ err: continue_at(cl, journal_write_done, system_highpri_wq); } -static void journal_write_work(struct work_struct *work) +/* + * returns true if there's nothing to flush and no journal write still in flight + */ +static bool journal_flush_write(struct journal *j) { - struct journal *j = container_of(to_delayed_work(work), - struct journal, write_work); + bool ret; + spin_lock(&j->lock); + ret = !j->reservations.prev_buf_unwritten; + if (!journal_entry_is_open(j)) { spin_unlock(&j->lock); - return; + return ret; } set_bit(JOURNAL_NEED_WRITE, &j->flags); - if (journal_buf_switch(j, false) != JOURNAL_UNLOCKED) + if (journal_buf_switch(j, false) == JOURNAL_UNLOCKED) + ret = false; + else spin_unlock(&j->lock); + return ret; +} + +static void journal_write_work(struct work_struct *work) +{ + struct journal *j = container_of(work, struct journal, write_work.work); + + journal_flush_write(j); } /* @@ -2514,6 +2542,43 @@ int bch2_journal_res_get_slowpath(struct journal *j, struct journal_res *res, return ret < 0 ? ret : 0; } +u64 bch2_journal_last_unwritten_seq(struct journal *j) +{ + u64 seq; + + spin_lock(&j->lock); + seq = atomic64_read(&j->seq); + if (j->reservations.prev_buf_unwritten) + seq--; + spin_unlock(&j->lock); + + return seq; +} + +int bch2_journal_open_seq_async(struct journal *j, u64 seq, struct closure *parent) +{ + int ret; + + spin_lock(&j->lock); + BUG_ON(seq > atomic64_read(&j->seq)); + + if (seq < atomic64_read(&j->seq) || + journal_entry_is_open(j)) { + spin_unlock(&j->lock); + return 1; + } + + ret = journal_entry_open(j); + if (!ret) + closure_wait(&j->async_wait, parent); + spin_unlock(&j->lock); + + if (!ret) + journal_reclaim_work(&j->reclaim_work.work); + + return ret; +} + void bch2_journal_wait_on_seq(struct journal *j, u64 seq, struct closure *parent) { spin_lock(&j->lock); @@ -2737,14 +2802,13 @@ int bch2_journal_flush_device(struct journal *j, unsigned dev_idx) struct bch_fs *c = container_of(j, struct bch_fs, journal); struct journal_entry_pin_list *p; struct bch_devs_list devs; - u64 seq = 0; - unsigned iter; + u64 iter, seq = 0; int ret = 0; spin_lock(&j->lock); fifo_for_each_entry_ptr(p, &j->pin, iter) if (bch2_dev_list_has_dev(p->devs, dev_idx)) - seq = journal_pin_seq(j, p); + seq = iter; spin_unlock(&j->lock); ret = bch2_journal_flush_pins(j, seq); @@ -2758,7 +2822,7 @@ int bch2_journal_flush_device(struct journal *j, unsigned dev_idx) spin_lock(&j->lock); while (!ret && seq < atomic64_read(&j->seq)) { - seq = max(seq, last_seq(j)); + seq = max(seq, journal_last_seq(j)); devs = journal_seq_pin(j, seq)->devs; seq++; @@ -2804,15 +2868,7 @@ void bch2_dev_journal_stop(struct journal *j, struct bch_dev *ca) void bch2_fs_journal_stop(struct journal *j) { - if (!test_bit(JOURNAL_STARTED, &j->flags)) - return; - - /* - * Empty out the journal by first flushing everything pinning existing - * journal entries, then force a brand new empty journal entry to be - * written: - */ - bch2_journal_flush_all_pins(j); + wait_event(j->wait, journal_flush_write(j)); cancel_delayed_work_sync(&j->write_work); cancel_delayed_work_sync(&j->reclaim_work); @@ -2914,7 +2970,7 @@ ssize_t bch2_journal_print_debug(struct journal *j, char *buf) spin_lock(&j->lock); ret += scnprintf(buf + ret, PAGE_SIZE - ret, - "active journal entries:\t%zu\n" + "active journal entries:\t%llu\n" "seq:\t\t\t%llu\n" "last_seq:\t\t%llu\n" "last_seq_ondisk:\t%llu\n" @@ -2927,7 +2983,7 @@ ssize_t bch2_journal_print_debug(struct journal *j, char *buf) "replay done:\t\t%i\n", fifo_used(&j->pin), (u64) atomic64_read(&j->seq), - last_seq(j), + journal_last_seq(j), j->last_seq_ondisk, journal_state_count(*s, s->idx), s->cur_entry_offset, @@ -2965,14 +3021,13 @@ ssize_t bch2_journal_print_pins(struct journal *j, char *buf) struct journal_entry_pin_list *pin_list; struct journal_entry_pin *pin; ssize_t ret = 0; - unsigned i; + u64 i; spin_lock(&j->lock); fifo_for_each_entry_ptr(pin_list, &j->pin, i) { ret += scnprintf(buf + ret, PAGE_SIZE - ret, "%llu: count %u\n", - journal_pin_seq(j, pin_list), - atomic_read(&pin_list->count)); + i, atomic_read(&pin_list->count)); list_for_each_entry(pin, &pin_list->list, list) ret += scnprintf(buf + ret, PAGE_SIZE - ret, |