Re: [PATCH 8/8] RFC: use a TASK_FIFO kthread for read completion support

From: Tal Zussman

Date: Thu Apr 09 2026 - 15:07:00 EST




On 4/9/26 12:02 PM, Christoph Hellwig wrote:
> Commit 3fffb589b9a6 ("erofs: add per-cpu threads for decompression as an
> option") explains why workqueue aren't great for low-latency completion
> handling. Switch to a per-cpu kthread to handle it instead. This code
> is based on the erofs code in the above commit, but further simplified
> by directly using a kthread instead of a kthread_work.
>
> Signed-off-by: Christoph Hellwig <hch@xxxxxx>
> ---
> block/bio.c | 117 +++++++++++++++++++++++++++++-----------------------
> 1 file changed, 65 insertions(+), 52 deletions(-)
>
> diff --git a/block/bio.c b/block/bio.c
> index 88d191455762..6a993fb129a0 100644
> --- a/block/bio.c
> +++ b/block/bio.c
> @@ -19,7 +19,7 @@
> #include <linux/blk-crypto.h>
> #include <linux/xarray.h>
> #include <linux/kmemleak.h>
> -#include <linux/llist.h>
> +#include <linux/freezer.h>

Why freezer.h and not kthread.h?

> #include <trace/events/block.h>
> #include "blk.h"
> @@ -1718,51 +1718,83 @@ void bio_check_pages_dirty(struct bio *bio)
> EXPORT_SYMBOL_GPL(bio_check_pages_dirty);
>
> struct bio_complete_batch {
> - struct llist_head list;

If we go with this approach, we should remove the newly-added bi_llist from
struct bio too.

> - struct delayed_work work;
> - int cpu;
> + spinlock_t lock;
> + struct bio_list bios;
> + struct task_struct *worker;
> };
>
> static DEFINE_PER_CPU(struct bio_complete_batch, bio_complete_batch);
> -static struct workqueue_struct *bio_complete_wq;
>
> -static void bio_complete_work_fn(struct work_struct *w)
> +static bool bio_try_complete_batch(struct bio_complete_batch *batch)
> {
> - struct delayed_work *dw = to_delayed_work(w);
> - struct bio_complete_batch *batch =
> - container_of(dw, struct bio_complete_batch, work);
> - struct llist_node *node;
> - struct bio *bio, *next;
> + struct bio_list bios;
> + unsigned long flags;
> + struct bio *bio;
>
> - do {
> - node = llist_del_all(&batch->list);
> - if (!node)
> - break;
> + spin_lock_irqsave(&batch->lock, flags);
> + bios = batch->bios;
> + bio_list_init(&batch->bios);
> + spin_unlock_irqrestore(&batch->lock, flags);
>
> - node = llist_reverse_order(node);
> - llist_for_each_entry_safe(bio, next, node, bi_llist)
> - bio->bi_end_io(bio);
> + if (bio_list_empty(&bios))
> + return false;
>
> - if (need_resched()) {
> - if (!llist_empty(&batch->list))
> - mod_delayed_work_on(batch->cpu,
> - bio_complete_wq,
> - &batch->work, 0);
> - break;
> - }
> - } while (1);
> + __set_current_state(TASK_RUNNING);
> + while ((bio = bio_list_pop(&bios)))
> + bio->bi_end_io(bio);
> + return true;
> +}
> +
> +static int bio_complete_thread(void *private)
> +{
> + struct bio_complete_batch *batch = private;
> +
> + for (;;) {
> + set_current_state(TASK_INTERRUPTIBLE);
> + if (!bio_try_complete_batch(batch))
> + schedule();
> + }
> +
> + return 0;
> }
>
> void __bio_complete_in_task(struct bio *bio)
> {
> - struct bio_complete_batch *batch = this_cpu_ptr(&bio_complete_batch);
> + struct bio_complete_batch *batch;
> + unsigned long flags;
> + bool wake;
> +
> + get_cpu();
> + batch = this_cpu_ptr(&bio_complete_batch);
> + spin_lock_irqsave(&batch->lock, flags);
> + wake = bio_list_empty(&batch->bios);
> + bio_list_add(&batch->bios, bio);
> + spin_unlock_irqrestore(&batch->lock, flags);
> + put_cpu();
>
> - if (llist_add(&bio->bi_llist, &batch->list))
> - mod_delayed_work_on(batch->cpu, bio_complete_wq,
> - &batch->work, 1);
> + if (wake)
> + wake_up_process(batch->worker);
> }
> EXPORT_SYMBOL_GPL(__bio_complete_in_task);
>
> +static void __init bio_complete_batch_init(int cpu)
> +{
> + struct bio_complete_batch *batch =
> + per_cpu_ptr(&bio_complete_batch, cpu);
> + struct task_struct *worker;
> +
> + worker = kthread_create_on_cpu(bio_complete_thread,
> + per_cpu_ptr(&bio_complete_batch, cpu),
> + cpu, "bio_worker/%u");
> + if (IS_ERR(worker))
> + panic("bio: can't create kthread_work");
> + sched_set_fifo_low(worker);
> +
> + spin_lock_init(&batch->lock);
> + bio_list_init(&batch->bios);
> + batch->worker = worker;
> +}
> +
> static inline bool bio_remaining_done(struct bio *bio)
> {
> /*
> @@ -2028,16 +2060,7 @@ EXPORT_SYMBOL(bioset_init);
> */
> static int bio_complete_batch_cpu_dead(unsigned int cpu)
> {
> - struct bio_complete_batch *batch =
> - per_cpu_ptr(&bio_complete_batch, cpu);
> - struct llist_node *node;
> - struct bio *bio, *next;
> -
> - node = llist_del_all(&batch->list);
> - node = llist_reverse_order(node);
> - llist_for_each_entry_safe(bio, next, node, bi_llist)
> - bio->bi_end_io(bio);
> -
> + bio_try_complete_batch(per_cpu_ptr(&bio_complete_batch, cpu));
> return 0;
> }
>
> @@ -2055,18 +2078,8 @@ static int __init init_bio(void)
> SLAB_HWCACHE_ALIGN | SLAB_PANIC, NULL);
> }
>
> - for_each_possible_cpu(i) {
> - struct bio_complete_batch *batch =
> - per_cpu_ptr(&bio_complete_batch, i);
> -
> - init_llist_head(&batch->list);
> - INIT_DELAYED_WORK(&batch->work, bio_complete_work_fn);
> - batch->cpu = i;
> - }
> -
> - bio_complete_wq = alloc_workqueue("bio_complete", WQ_MEM_RECLAIM, 0);
> - if (!bio_complete_wq)
> - panic("bio: can't allocate bio_complete workqueue\n");
> + for_each_possible_cpu(i)
> + bio_complete_batch_init(i);
>
> cpuhp_setup_state(CPUHP_BP_PREPARE_DYN, "block/bio:complete:dead",
> NULL, bio_complete_batch_cpu_dead);
> --
> 2.47.3
>