Merge tag 'wq-for-7.1' of git://git.kernel.org/pub/scm/linux/kernel/git/tj/wq

Pull workqueue updates from Tejun Heo:

 - New default WQ_AFFN_CACHE_SHARD affinity scope subdivides LLCs into
   smaller shards to improve scalability on machines with many CPUs per
   LLC

 - Misc:
    - system_dfl_long_wq for long unbound works
    - devm_alloc_workqueue() for device-managed allocation
    - sysfs exposure for ordered workqueues and the EFI workqueue
    - removal of HK_TYPE_WQ from wq_unbound_cpumask
    - various small fixes

* tag 'wq-for-7.1' of git://git.kernel.org/pub/scm/linux/kernel/git/tj/wq: (21 commits)
  workqueue: validate cpumask_first() result in llc_populate_cpu_shard_id()
  workqueue: use NR_STD_WORKER_POOLS instead of hardcoded value
  workqueue: avoid unguarded 64-bit division
  docs: workqueue: document WQ_AFFN_CACHE_SHARD affinity scope
  workqueue: add test_workqueue benchmark module
  tools/workqueue: add CACHE_SHARD support to wq_dump.py
  workqueue: set WQ_AFFN_CACHE_SHARD as the default affinity scope
  workqueue: add WQ_AFFN_CACHE_SHARD affinity scope
  workqueue: fix typo in WQ_AFFN_SMT comment
  workqueue: Remove HK_TYPE_WQ from affecting wq_unbound_cpumask
  workqueue: unlink pwqs from wq->pwqs list in alloc_and_link_pwqs() error path
  workqueue: Remove NULL wq WARN in __queue_delayed_work()
  workqueue: fix parse_affn_scope() prefix matching bug
  workqueue: devres: Add device-managed allocate workqueue
  workqueue: Add system_dfl_long_wq for long unbound works
  tools/workqueue/wq_dump.py: add NODE prefix to all node columns
  tools/workqueue/wq_dump.py: fix column alignment in node_nr/max_active section
  tools/workqueue/wq_dump.py: remove backslash separator from node_nr/max_active header
  efi: Allow to expose the workqueue via sysfs
  workqueue: Allow to expose ordered workqueues via sysfs
  ...
This commit is contained in:
Linus Torvalds
2026-04-15 10:32:08 -07:00
10 changed files with 629 additions and 51 deletions

View File

@@ -8543,7 +8543,8 @@ Kernel parameters
workqueue.default_affinity_scope=
Select the default affinity scope to use for unbound
workqueues. Can be one of "cpu", "smt", "cache",
"numa" and "system". Default is "cache". For more
"cache_shard", "numa" and "system". Default is
"cache_shard". For more
information, see the Affinity Scopes section in
Documentation/core-api/workqueue.rst.

View File

@@ -378,9 +378,9 @@ Affinity Scopes
An unbound workqueue groups CPUs according to its affinity scope to improve
cache locality. For example, if a workqueue is using the default affinity
scope of "cache", it will group CPUs according to last level cache
boundaries. A work item queued on the workqueue will be assigned to a worker
on one of the CPUs which share the last level cache with the issuing CPU.
scope of "cache_shard", it will group CPUs into sub-LLC shards. A work item
queued on the workqueue will be assigned to a worker on one of the CPUs
within the same shard as the issuing CPU.
Once started, the worker may or may not be allowed to move outside the scope
depending on the ``affinity_strict`` setting of the scope.
@@ -402,7 +402,13 @@ Workqueue currently supports the following affinity scopes.
``cache``
CPUs are grouped according to cache boundaries. Which specific cache
boundary is used is determined by the arch code. L3 is used in a lot of
cases. This is the default affinity scope.
cases.
``cache_shard``
CPUs are grouped into sub-LLC shards of at most ``wq_cache_shard_size``
cores (default 8, tunable via the ``workqueue.cache_shard_size`` boot
parameter). Shards are always split on core (SMT group) boundaries.
This is the default affinity scope.
``numa``
CPUs are grouped according to NUMA boundaries.

View File

@@ -464,3 +464,7 @@ SPI
WATCHDOG
devm_watchdog_register_device()
WORKQUEUE
devm_alloc_workqueue()
devm_alloc_ordered_workqueue()

View File

@@ -423,7 +423,7 @@ static int __init efisubsys_init(void)
* ordered workqueue (which creates only one execution context)
* should suffice for all our needs.
*/
efi_rts_wq = alloc_ordered_workqueue("efi_rts_wq", 0);
efi_rts_wq = alloc_ordered_workqueue("efi_runtime", WQ_SYSFS);
if (!efi_rts_wq) {
pr_err("Creating efi_rts_wq failed, EFI runtime services disabled.\n");
clear_bit(EFI_RUNTIME_SERVICES, &efi.flags);

View File

@@ -131,8 +131,9 @@ struct rcu_work {
enum wq_affn_scope {
WQ_AFFN_DFL, /* use system default */
WQ_AFFN_CPU, /* one pod per CPU */
WQ_AFFN_SMT, /* one pod poer SMT */
WQ_AFFN_SMT, /* one pod per SMT */
WQ_AFFN_CACHE, /* one pod per LLC */
WQ_AFFN_CACHE_SHARD, /* synthetic sub-LLC shards */
WQ_AFFN_NUMA, /* one pod per NUMA node */
WQ_AFFN_SYSTEM, /* one pod across the whole system */
@@ -440,6 +441,9 @@ enum wq_consts {
* system_long_wq is similar to system_percpu_wq but may host long running
* works. Queue flushing might take relatively long.
*
* system_dfl_long_wq is similar to system_dfl_wq but it may host long running
* works.
*
* system_dfl_wq is unbound workqueue. Workers are not bound to
* any specific CPU, not concurrency managed, and all queued works are
* executed immediately as long as max_active limit is not reached and
@@ -468,6 +472,7 @@ extern struct workqueue_struct *system_power_efficient_wq;
extern struct workqueue_struct *system_freezable_power_efficient_wq;
extern struct workqueue_struct *system_bh_wq;
extern struct workqueue_struct *system_bh_highpri_wq;
extern struct workqueue_struct *system_dfl_long_wq;
void workqueue_softirq_action(bool highpri);
void workqueue_softirq_dead(unsigned int cpu);
@@ -512,6 +517,26 @@ __printf(1, 4) struct workqueue_struct *
alloc_workqueue_noprof(const char *fmt, unsigned int flags, int max_active, ...);
#define alloc_workqueue(...) alloc_hooks(alloc_workqueue_noprof(__VA_ARGS__))
/**
* devm_alloc_workqueue - Resource-managed allocate a workqueue
* @dev: Device to allocate workqueue for
* @fmt: printf format for the name of the workqueue
* @flags: WQ_* flags
* @max_active: max in-flight work items, 0 for default
* @...: args for @fmt
*
* Resource managed workqueue, see alloc_workqueue() for details.
*
* The workqueue will be automatically destroyed on driver detach. Typically
* this should be used in drivers already relying on devm interafaces.
*
* RETURNS:
* Pointer to the allocated workqueue on success, %NULL on failure.
*/
__printf(2, 5) struct workqueue_struct *
devm_alloc_workqueue(struct device *dev, const char *fmt, unsigned int flags,
int max_active, ...);
#ifdef CONFIG_LOCKDEP
/**
* alloc_workqueue_lockdep_map - allocate a workqueue with user-defined lockdep_map
@@ -568,6 +593,8 @@ alloc_workqueue_lockdep_map(const char *fmt, unsigned int flags, int max_active,
*/
#define alloc_ordered_workqueue(fmt, flags, args...) \
alloc_workqueue(fmt, WQ_UNBOUND | __WQ_ORDERED | (flags), 1, ##args)
#define devm_alloc_ordered_workqueue(dev, fmt, flags, args...) \
devm_alloc_workqueue(dev, fmt, WQ_UNBOUND | __WQ_ORDERED | (flags), 1, ##args)
#define create_workqueue(name) \
alloc_workqueue("%s", __WQ_LEGACY | WQ_MEM_RECLAIM | WQ_PERCPU, 1, (name))
@@ -712,14 +739,14 @@ static inline bool schedule_work_on(int cpu, struct work_struct *work)
}
/**
* schedule_work - put work task in global workqueue
* schedule_work - put work task in per-CPU workqueue
* @work: job to be done
*
* Returns %false if @work was already on the kernel-global workqueue and
* Returns %false if @work was already on the system per-CPU workqueue and
* %true otherwise.
*
* This puts a job in the kernel-global workqueue if it was not already
* queued and leaves it in the same position on the kernel-global
* This puts a job in the system per-CPU workqueue if it was not already
* queued and leaves it in the same position on the system per-CPU
* workqueue otherwise.
*
* Shares the same memory-ordering properties of queue_work(), cf. the
@@ -783,6 +810,8 @@ extern void __warn_flushing_systemwide_wq(void)
_wq == system_highpri_wq) || \
(__builtin_constant_p(_wq == system_long_wq) && \
_wq == system_long_wq) || \
(__builtin_constant_p(_wq == system_dfl_long_wq) && \
_wq == system_dfl_long_wq) || \
(__builtin_constant_p(_wq == system_dfl_wq) && \
_wq == system_dfl_wq) || \
(__builtin_constant_p(_wq == system_freezable_wq) && \
@@ -796,12 +825,12 @@ extern void __warn_flushing_systemwide_wq(void)
})
/**
* schedule_delayed_work_on - queue work in global workqueue on CPU after delay
* schedule_delayed_work_on - queue work in per-CPU workqueue on CPU after delay
* @cpu: cpu to use
* @dwork: job to be done
* @delay: number of jiffies to wait
*
* After waiting for a given time this puts a job in the kernel-global
* After waiting for a given time this puts a job in the system per-CPU
* workqueue on the specified CPU.
*/
static inline bool schedule_delayed_work_on(int cpu, struct delayed_work *dwork,
@@ -811,11 +840,11 @@ static inline bool schedule_delayed_work_on(int cpu, struct delayed_work *dwork,
}
/**
* schedule_delayed_work - put work task in global workqueue after delay
* schedule_delayed_work - put work task in per-CPU workqueue after delay
* @dwork: job to be done
* @delay: number of jiffies to wait or 0 for immediate execution
*
* After waiting for a given time this puts a job in the kernel-global
* After waiting for a given time this puts a job in the system per-CPU
* workqueue.
*/
static inline bool schedule_delayed_work(struct delayed_work *dwork,

View File

@@ -41,6 +41,7 @@
#include <linux/mempolicy.h>
#include <linux/freezer.h>
#include <linux/debug_locks.h>
#include <linux/device/devres.h>
#include <linux/lockdep.h>
#include <linux/idr.h>
#include <linux/jhash.h>
@@ -130,6 +131,14 @@ enum wq_internal_consts {
WORKER_ID_LEN = 10 + WQ_NAME_LEN, /* "kworker/R-" + WQ_NAME_LEN */
};
/* Layout of shards within one LLC pod */
struct llc_shard_layout {
int nr_large_shards; /* number of large shards (cores_per_shard + 1) */
int cores_per_shard; /* base number of cores per default shard */
int nr_shards; /* total number of shards */
/* nr_default shards = (nr_shards - nr_large_shards) */
};
/*
* We don't want to trap softirq for too long. See MAX_SOFTIRQ_TIME and
* MAX_SOFTIRQ_RESTART in kernel/softirq.c. These are macros because
@@ -404,11 +413,12 @@ struct work_offq_data {
u32 flags;
};
static const char *wq_affn_names[WQ_AFFN_NR_TYPES] = {
static const char * const wq_affn_names[WQ_AFFN_NR_TYPES] = {
[WQ_AFFN_DFL] = "default",
[WQ_AFFN_CPU] = "cpu",
[WQ_AFFN_SMT] = "smt",
[WQ_AFFN_CACHE] = "cache",
[WQ_AFFN_CACHE_SHARD] = "cache_shard",
[WQ_AFFN_NUMA] = "numa",
[WQ_AFFN_SYSTEM] = "system",
};
@@ -431,13 +441,16 @@ module_param_named(cpu_intensive_warning_thresh, wq_cpu_intensive_warning_thresh
static bool wq_power_efficient = IS_ENABLED(CONFIG_WQ_POWER_EFFICIENT_DEFAULT);
module_param_named(power_efficient, wq_power_efficient, bool, 0444);
static unsigned int wq_cache_shard_size = 8;
module_param_named(cache_shard_size, wq_cache_shard_size, uint, 0444);
static bool wq_online; /* can kworkers be created yet? */
static bool wq_topo_initialized __read_mostly = false;
static struct kmem_cache *pwq_cache;
static struct wq_pod_type wq_pod_types[WQ_AFFN_NR_TYPES];
static enum wq_affn_scope wq_affn_dfl = WQ_AFFN_CACHE;
static enum wq_affn_scope wq_affn_dfl = WQ_AFFN_CACHE_SHARD;
/* buf for wq_update_unbound_pod_attrs(), protected by CPU hotplug exclusion */
static struct workqueue_attrs *unbound_wq_update_pwq_attrs_buf;
@@ -530,6 +543,8 @@ struct workqueue_struct *system_bh_wq;
EXPORT_SYMBOL_GPL(system_bh_wq);
struct workqueue_struct *system_bh_highpri_wq;
EXPORT_SYMBOL_GPL(system_bh_highpri_wq);
struct workqueue_struct *system_dfl_long_wq __ro_after_init;
EXPORT_SYMBOL_GPL(system_dfl_long_wq);
static int worker_thread(void *__worker);
static void workqueue_sysfs_unregister(struct workqueue_struct *wq);
@@ -2519,7 +2534,6 @@ static void __queue_delayed_work(int cpu, struct workqueue_struct *wq,
struct timer_list *timer = &dwork->timer;
struct work_struct *work = &dwork->work;
WARN_ON_ONCE(!wq);
WARN_ON_ONCE(timer->function != delayed_work_timer_fn);
WARN_ON_ONCE(timer_pending(timer));
WARN_ON_ONCE(!list_empty(&work->entry));
@@ -5635,8 +5649,16 @@ enomem:
for_each_possible_cpu(cpu) {
struct pool_workqueue *pwq = *per_cpu_ptr(wq->cpu_pwq, cpu);
if (pwq)
if (pwq) {
/*
* Unlink pwq from wq->pwqs since link_pwq()
* may have already added it. wq->mutex is not
* needed as the wq has not been published yet.
*/
if (!list_empty(&pwq->pwqs_node))
list_del_rcu(&pwq->pwqs_node);
kmem_cache_free(pwq_cache, pwq);
}
}
free_percpu(wq->cpu_pwq);
wq->cpu_pwq = NULL;
@@ -5904,6 +5926,33 @@ struct workqueue_struct *alloc_workqueue_noprof(const char *fmt,
}
EXPORT_SYMBOL_GPL(alloc_workqueue_noprof);
static void devm_workqueue_release(void *res)
{
destroy_workqueue(res);
}
__printf(2, 5) struct workqueue_struct *
devm_alloc_workqueue(struct device *dev, const char *fmt, unsigned int flags,
int max_active, ...)
{
struct workqueue_struct *wq;
va_list args;
int ret;
va_start(args, max_active);
wq = alloc_workqueue(fmt, flags, max_active, args);
va_end(args);
if (!wq)
return NULL;
ret = devm_add_action_or_reset(dev, devm_workqueue_release, wq);
if (ret)
return NULL;
return wq;
}
EXPORT_SYMBOL_GPL(devm_alloc_workqueue);
#ifdef CONFIG_LOCKDEP
__printf(1, 5)
struct workqueue_struct *
@@ -7059,7 +7108,7 @@ int workqueue_unbound_housekeeping_update(const struct cpumask *hk)
/*
* If the operation fails, it will fall back to
* wq_requested_unbound_cpumask which is initially set to
* (HK_TYPE_WQ HK_TYPE_DOMAIN) house keeping mask and rewritten
* HK_TYPE_DOMAIN house keeping mask and rewritten
* by any subsequent write to workqueue/cpumask sysfs file.
*/
if (!cpumask_and(cpumask, wq_requested_unbound_cpumask, hk))
@@ -7078,13 +7127,7 @@ int workqueue_unbound_housekeeping_update(const struct cpumask *hk)
static int parse_affn_scope(const char *val)
{
int i;
for (i = 0; i < ARRAY_SIZE(wq_affn_names); i++) {
if (!strncasecmp(val, wq_affn_names[i], strlen(wq_affn_names[i])))
return i;
}
return -EINVAL;
return sysfs_match_string(wq_affn_names, val);
}
static int wq_affn_dfl_set(const char *val, const struct kernel_param *kp)
@@ -7191,7 +7234,26 @@ static struct attribute *wq_sysfs_attrs[] = {
&dev_attr_max_active.attr,
NULL,
};
ATTRIBUTE_GROUPS(wq_sysfs);
static umode_t wq_sysfs_is_visible(struct kobject *kobj, struct attribute *a, int n)
{
struct device *dev = kobj_to_dev(kobj);
struct workqueue_struct *wq = dev_to_wq(dev);
/*
* Adjusting max_active breaks ordering guarantee. Changing it has no
* effect on BH worker. Limit max_active to RO in such case.
*/
if (wq->flags & (WQ_BH | __WQ_ORDERED))
return 0444;
return a->mode;
}
static const struct attribute_group wq_sysfs_group = {
.is_visible = wq_sysfs_is_visible,
.attrs = wq_sysfs_attrs,
};
__ATTRIBUTE_GROUPS(wq_sysfs);
static ssize_t wq_nice_show(struct device *dev, struct device_attribute *attr,
char *buf)
@@ -7494,13 +7556,6 @@ int workqueue_sysfs_register(struct workqueue_struct *wq)
struct wq_device *wq_dev;
int ret;
/*
* Adjusting max_active breaks ordering guarantee. Disallow exposing
* ordered workqueues.
*/
if (WARN_ON(wq->flags & __WQ_ORDERED))
return -EINVAL;
wq->wq_dev = wq_dev = kzalloc_obj(*wq_dev);
if (!wq_dev)
return -ENOMEM;
@@ -7877,8 +7932,8 @@ void __init workqueue_init_early(void)
{
struct wq_pod_type *pt = &wq_pod_types[WQ_AFFN_SYSTEM];
int std_nice[NR_STD_WORKER_POOLS] = { 0, HIGHPRI_NICE_LEVEL };
void (*irq_work_fns[2])(struct irq_work *) = { bh_pool_kick_normal,
bh_pool_kick_highpri };
void (*irq_work_fns[NR_STD_WORKER_POOLS])(struct irq_work *) =
{ bh_pool_kick_normal, bh_pool_kick_highpri };
int i, cpu;
BUILD_BUG_ON(__alignof__(struct pool_workqueue) < __alignof__(long long));
@@ -7890,7 +7945,6 @@ void __init workqueue_init_early(void)
cpumask_copy(wq_online_cpumask, cpu_online_mask);
cpumask_copy(wq_unbound_cpumask, cpu_possible_mask);
restrict_unbound_cpumask("HK_TYPE_WQ", housekeeping_cpumask(HK_TYPE_WQ));
restrict_unbound_cpumask("HK_TYPE_DOMAIN", housekeeping_cpumask(HK_TYPE_DOMAIN));
if (!cpumask_empty(&wq_cmdline_cpumask))
restrict_unbound_cpumask("workqueue.unbound_cpus", &wq_cmdline_cpumask);
@@ -7974,11 +8028,12 @@ void __init workqueue_init_early(void)
system_bh_wq = alloc_workqueue("events_bh", WQ_BH | WQ_PERCPU, 0);
system_bh_highpri_wq = alloc_workqueue("events_bh_highpri",
WQ_BH | WQ_HIGHPRI | WQ_PERCPU, 0);
system_dfl_long_wq = alloc_workqueue("events_dfl_long", WQ_UNBOUND, WQ_MAX_ACTIVE);
BUG_ON(!system_wq || !system_percpu_wq|| !system_highpri_wq || !system_long_wq ||
!system_unbound_wq || !system_freezable_wq || !system_dfl_wq ||
!system_power_efficient_wq ||
!system_freezable_power_efficient_wq ||
!system_bh_wq || !system_bh_highpri_wq);
!system_bh_wq || !system_bh_highpri_wq || !system_dfl_long_wq);
}
static void __init wq_cpu_intensive_thresh_init(void)
@@ -8144,6 +8199,186 @@ static bool __init cpus_share_numa(int cpu0, int cpu1)
return cpu_to_node(cpu0) == cpu_to_node(cpu1);
}
/* Maps each CPU to its shard index within the LLC pod it belongs to */
static int cpu_shard_id[NR_CPUS] __initdata;
/**
* llc_count_cores - count distinct cores (SMT groups) within an LLC pod
* @pod_cpus: the cpumask of CPUs in the LLC pod
* @smt_pods: the SMT pod type, used to identify sibling groups
*
* A core is represented by the lowest-numbered CPU in its SMT group. Returns
* the number of distinct cores found in @pod_cpus.
*/
static int __init llc_count_cores(const struct cpumask *pod_cpus,
struct wq_pod_type *smt_pods)
{
const struct cpumask *sibling_cpus;
int nr_cores = 0, c;
/*
* Count distinct cores by only counting the first CPU in each
* SMT sibling group.
*/
for_each_cpu(c, pod_cpus) {
sibling_cpus = smt_pods->pod_cpus[smt_pods->cpu_pod[c]];
if (cpumask_first(sibling_cpus) == c)
nr_cores++;
}
return nr_cores;
}
/*
* llc_shard_size - number of cores in a given shard
*
* Cores are spread as evenly as possible. The first @nr_large_shards shards are
* "large shards" with (cores_per_shard + 1) cores; the rest are "default
* shards" with cores_per_shard cores.
*/
static int __init llc_shard_size(int shard_id, int cores_per_shard, int nr_large_shards)
{
/* The first @nr_large_shards shards are large shards */
if (shard_id < nr_large_shards)
return cores_per_shard + 1;
/* The remaining shards are default shards */
return cores_per_shard;
}
/*
* llc_calc_shard_layout - compute the shard layout for an LLC pod
* @nr_cores: number of distinct cores in the LLC pod
*
* Chooses the number of shards that keeps average shard size closest to
* wq_cache_shard_size. Returns a struct describing the total number of shards,
* the base size of each, and how many are large shards.
*/
static struct llc_shard_layout __init llc_calc_shard_layout(int nr_cores)
{
struct llc_shard_layout layout;
/* Ensure at least one shard; pick the count closest to the target size */
layout.nr_shards = max(1, DIV_ROUND_CLOSEST(nr_cores, wq_cache_shard_size));
layout.cores_per_shard = nr_cores / layout.nr_shards;
layout.nr_large_shards = nr_cores % layout.nr_shards;
return layout;
}
/*
* llc_shard_is_full - check whether a shard has reached its core capacity
* @cores_in_shard: number of cores already assigned to this shard
* @shard_id: index of the shard being checked
* @layout: the shard layout computed by llc_calc_shard_layout()
*
* Returns true if @cores_in_shard equals the expected size for @shard_id.
*/
static bool __init llc_shard_is_full(int cores_in_shard, int shard_id,
const struct llc_shard_layout *layout)
{
return cores_in_shard == llc_shard_size(shard_id, layout->cores_per_shard,
layout->nr_large_shards);
}
/**
* llc_populate_cpu_shard_id - populate cpu_shard_id[] for each CPU in an LLC pod
* @pod_cpus: the cpumask of CPUs in the LLC pod
* @smt_pods: the SMT pod type, used to identify sibling groups
* @nr_cores: number of distinct cores in @pod_cpus (from llc_count_cores())
*
* Walks @pod_cpus in order. At each SMT group leader, advances to the next
* shard once the current shard is full. Results are written to cpu_shard_id[].
*/
static void __init llc_populate_cpu_shard_id(const struct cpumask *pod_cpus,
struct wq_pod_type *smt_pods,
int nr_cores)
{
struct llc_shard_layout layout = llc_calc_shard_layout(nr_cores);
const struct cpumask *sibling_cpus;
/* Count the number of cores in the current shard_id */
int cores_in_shard = 0;
unsigned int leader;
/* This is a cursor for the shards. Go from zero to nr_shards - 1*/
int shard_id = 0;
int c;
/* Iterate at every CPU for a given LLC pod, and assign it a shard */
for_each_cpu(c, pod_cpus) {
sibling_cpus = smt_pods->pod_cpus[smt_pods->cpu_pod[c]];
if (cpumask_first(sibling_cpus) == c) {
/* This is the CPU leader for the siblings */
if (llc_shard_is_full(cores_in_shard, shard_id, &layout)) {
shard_id++;
cores_in_shard = 0;
}
cores_in_shard++;
cpu_shard_id[c] = shard_id;
} else {
/*
* The siblings' shard MUST be the same as the leader.
* never split threads in the same core.
*/
leader = cpumask_first(sibling_cpus);
/*
* This check silences a Warray-bounds warning on UP
* configs where NR_CPUS=1 makes cpu_shard_id[]
* a single-element array, and the compiler can't
* prove the index is always 0.
*/
if (WARN_ON_ONCE(leader >= nr_cpu_ids))
continue;
cpu_shard_id[c] = cpu_shard_id[leader];
}
}
WARN_ON_ONCE(shard_id != (layout.nr_shards - 1));
}
/**
* precompute_cache_shard_ids - assign each CPU its shard index within its LLC
*
* Iterates over all LLC pods. For each pod, counts distinct cores then assigns
* shard indices to all CPUs in the pod. Must be called after WQ_AFFN_CACHE and
* WQ_AFFN_SMT have been initialized.
*/
static void __init precompute_cache_shard_ids(void)
{
struct wq_pod_type *llc_pods = &wq_pod_types[WQ_AFFN_CACHE];
struct wq_pod_type *smt_pods = &wq_pod_types[WQ_AFFN_SMT];
const struct cpumask *cpus_sharing_llc;
int nr_cores;
int pod;
if (!wq_cache_shard_size) {
pr_warn("workqueue: cache_shard_size must be > 0, setting to 1\n");
wq_cache_shard_size = 1;
}
for (pod = 0; pod < llc_pods->nr_pods; pod++) {
cpus_sharing_llc = llc_pods->pod_cpus[pod];
/* Number of cores in this given LLC */
nr_cores = llc_count_cores(cpus_sharing_llc, smt_pods);
llc_populate_cpu_shard_id(cpus_sharing_llc, smt_pods, nr_cores);
}
}
/*
* cpus_share_cache_shard - test whether two CPUs belong to the same cache shard
*
* Two CPUs share a cache shard if they are in the same LLC and have the same
* shard index. Used as the pod affinity callback for WQ_AFFN_CACHE_SHARD.
*/
static bool __init cpus_share_cache_shard(int cpu0, int cpu1)
{
if (!cpus_share_cache(cpu0, cpu1))
return false;
return cpu_shard_id[cpu0] == cpu_shard_id[cpu1];
}
/**
* workqueue_init_topology - initialize CPU pods for unbound workqueues
*
@@ -8159,6 +8394,8 @@ void __init workqueue_init_topology(void)
init_pod_type(&wq_pod_types[WQ_AFFN_CPU], cpus_dont_share);
init_pod_type(&wq_pod_types[WQ_AFFN_SMT], cpus_share_smt);
init_pod_type(&wq_pod_types[WQ_AFFN_CACHE], cpus_share_cache);
precompute_cache_shard_ids();
init_pod_type(&wq_pod_types[WQ_AFFN_CACHE_SHARD], cpus_share_cache_shard);
init_pod_type(&wq_pod_types[WQ_AFFN_NUMA], cpus_share_numa);
wq_topo_initialized = true;

View File

@@ -2636,6 +2636,16 @@ config TEST_VMALLOC
If unsure, say N.
config TEST_WORKQUEUE
tristate "Test module for stress/performance analysis of workqueue"
default n
help
This builds the "test_workqueue" module for benchmarking
workqueue throughput under contention. Useful for evaluating
affinity scope changes (e.g., cache_shard vs cache).
If unsure, say N.
config TEST_BPF
tristate "Test BPF filter functionality"
depends on m && NET

View File

@@ -79,6 +79,7 @@ UBSAN_SANITIZE_test_ubsan.o := y
obj-$(CONFIG_TEST_KSTRTOX) += test-kstrtox.o
obj-$(CONFIG_TEST_LKM) += test_module.o
obj-$(CONFIG_TEST_VMALLOC) += test_vmalloc.o
obj-$(CONFIG_TEST_WORKQUEUE) += test_workqueue.o
obj-$(CONFIG_TEST_RHASHTABLE) += test_rhashtable.o
obj-$(CONFIG_TEST_STATIC_KEYS) += test_static_keys.o
obj-$(CONFIG_TEST_STATIC_KEYS) += test_static_key_base.o

294
lib/test_workqueue.c Normal file
View File

@@ -0,0 +1,294 @@
// SPDX-License-Identifier: GPL-2.0
/*
* Test module for stress and performance analysis of workqueue.
*
* Benchmarks queue_work() throughput on an unbound workqueue to measure
* pool->lock contention under different affinity scope configurations
* (e.g., cache vs cache_shard).
*
* The affinity scope is changed between runs via the workqueue's sysfs
* affinity_scope attribute (WQ_SYSFS).
*
* Copyright (c) 2026 Meta Platforms, Inc. and affiliates
* Copyright (c) 2026 Breno Leitao <leitao@debian.org>
*
*/
#include <linux/init.h>
#include <linux/kernel.h>
#include <linux/module.h>
#include <linux/workqueue.h>
#include <linux/kthread.h>
#include <linux/moduleparam.h>
#include <linux/completion.h>
#include <linux/atomic.h>
#include <linux/slab.h>
#include <linux/ktime.h>
#include <linux/cpumask.h>
#include <linux/sched.h>
#include <linux/sort.h>
#include <linux/fs.h>
#define WQ_NAME "bench_wq"
#define SCOPE_PATH "/sys/bus/workqueue/devices/" WQ_NAME "/affinity_scope"
static int nr_threads;
module_param(nr_threads, int, 0444);
MODULE_PARM_DESC(nr_threads,
"Number of threads to spawn (default: 0 = num_online_cpus())");
static int wq_items = 50000;
module_param(wq_items, int, 0444);
MODULE_PARM_DESC(wq_items,
"Number of work items each thread queues (default: 50000)");
static struct workqueue_struct *bench_wq;
static atomic_t threads_done;
static DECLARE_COMPLETION(start_comp);
static DECLARE_COMPLETION(all_done_comp);
struct thread_ctx {
struct completion work_done;
struct work_struct work;
u64 *latencies;
int cpu;
int items;
};
static void bench_work_fn(struct work_struct *work)
{
struct thread_ctx *ctx = container_of(work, struct thread_ctx, work);
complete(&ctx->work_done);
}
static int bench_kthread_fn(void *data)
{
struct thread_ctx *ctx = data;
ktime_t t_start, t_end;
int i;
/* Wait for all threads to be ready */
wait_for_completion(&start_comp);
if (kthread_should_stop())
return 0;
for (i = 0; i < ctx->items; i++) {
reinit_completion(&ctx->work_done);
INIT_WORK(&ctx->work, bench_work_fn);
t_start = ktime_get();
queue_work(bench_wq, &ctx->work);
t_end = ktime_get();
ctx->latencies[i] = ktime_to_ns(ktime_sub(t_end, t_start));
wait_for_completion(&ctx->work_done);
}
if (atomic_dec_and_test(&threads_done))
complete(&all_done_comp);
/*
* Wait for kthread_stop() so the module text isn't freed
* while we're still executing.
*/
while (!kthread_should_stop())
schedule();
return 0;
}
static int cmp_u64(const void *a, const void *b)
{
u64 va = *(const u64 *)a;
u64 vb = *(const u64 *)b;
if (va < vb)
return -1;
if (va > vb)
return 1;
return 0;
}
static int __init set_affn_scope(const char *scope)
{
struct file *f;
loff_t pos = 0;
ssize_t ret;
f = filp_open(SCOPE_PATH, O_WRONLY, 0);
if (IS_ERR(f)) {
pr_err("test_workqueue: open %s failed: %ld\n",
SCOPE_PATH, PTR_ERR(f));
return PTR_ERR(f);
}
ret = kernel_write(f, scope, strlen(scope), &pos);
filp_close(f, NULL);
if (ret < 0) {
pr_err("test_workqueue: write '%s' failed: %zd\n", scope, ret);
return ret;
}
return 0;
}
static int __init run_bench(int n_threads, const char *scope, const char *label)
{
struct task_struct **tasks;
unsigned long total_items;
struct thread_ctx *ctxs;
u64 *all_latencies;
ktime_t start, end;
int cpu, i, j, ret;
s64 elapsed_us;
ret = set_affn_scope(scope);
if (ret)
return ret;
ctxs = kcalloc(n_threads, sizeof(*ctxs), GFP_KERNEL);
if (!ctxs)
return -ENOMEM;
tasks = kcalloc(n_threads, sizeof(*tasks), GFP_KERNEL);
if (!tasks) {
kfree(ctxs);
return -ENOMEM;
}
total_items = (unsigned long)n_threads * wq_items;
all_latencies = kvmalloc_array(total_items, sizeof(u64), GFP_KERNEL);
if (!all_latencies) {
kfree(tasks);
kfree(ctxs);
return -ENOMEM;
}
/* Allocate per-thread latency arrays */
for (i = 0; i < n_threads; i++) {
ctxs[i].latencies = kvmalloc_array(wq_items, sizeof(u64),
GFP_KERNEL);
if (!ctxs[i].latencies) {
while (--i >= 0)
kvfree(ctxs[i].latencies);
kvfree(all_latencies);
kfree(tasks);
kfree(ctxs);
return -ENOMEM;
}
}
atomic_set(&threads_done, n_threads);
reinit_completion(&all_done_comp);
reinit_completion(&start_comp);
/* Create kthreads, each bound to a different online CPU */
i = 0;
for_each_online_cpu(cpu) {
if (i >= n_threads)
break;
ctxs[i].cpu = cpu;
ctxs[i].items = wq_items;
init_completion(&ctxs[i].work_done);
tasks[i] = kthread_create(bench_kthread_fn, &ctxs[i],
"wq_bench/%d", cpu);
if (IS_ERR(tasks[i])) {
ret = PTR_ERR(tasks[i]);
pr_err("test_workqueue: failed to create kthread %d: %d\n",
i, ret);
/* Unblock threads waiting on start_comp before stopping them */
complete_all(&start_comp);
while (--i >= 0)
kthread_stop(tasks[i]);
goto out_free;
}
kthread_bind(tasks[i], cpu);
wake_up_process(tasks[i]);
i++;
}
/* Start timing and release all threads */
start = ktime_get();
complete_all(&start_comp);
/* Wait for all threads to finish the benchmark */
wait_for_completion(&all_done_comp);
/* Drain any remaining work */
flush_workqueue(bench_wq);
/* Ensure all kthreads have fully exited before module memory is freed */
for (i = 0; i < n_threads; i++)
kthread_stop(tasks[i]);
end = ktime_get();
elapsed_us = ktime_us_delta(end, start);
/* Merge all per-thread latencies and sort for percentile calculation */
j = 0;
for (i = 0; i < n_threads; i++) {
memcpy(&all_latencies[j], ctxs[i].latencies,
wq_items * sizeof(u64));
j += wq_items;
}
sort(all_latencies, total_items, sizeof(u64), cmp_u64, NULL);
pr_info("test_workqueue: %-16s %llu items/sec\tp50=%llu\tp90=%llu\tp95=%llu ns\n",
label,
elapsed_us ? div_u64(total_items * 1000000ULL, elapsed_us) : 0,
all_latencies[total_items * 50 / 100],
all_latencies[total_items * 90 / 100],
all_latencies[total_items * 95 / 100]);
ret = 0;
out_free:
for (i = 0; i < n_threads; i++)
kvfree(ctxs[i].latencies);
kvfree(all_latencies);
kfree(tasks);
kfree(ctxs);
return ret;
}
static const char * const bench_scopes[] = {
"cpu", "smt", "cache_shard", "cache", "numa", "system",
};
static int __init test_workqueue_init(void)
{
int n_threads = min(nr_threads ?: num_online_cpus(), num_online_cpus());
int i;
if (wq_items <= 0) {
pr_err("test_workqueue: wq_items must be > 0\n");
return -EINVAL;
}
bench_wq = alloc_workqueue(WQ_NAME, WQ_UNBOUND | WQ_SYSFS, 0);
if (!bench_wq)
return -ENOMEM;
pr_info("test_workqueue: running %d threads, %d items/thread\n",
n_threads, wq_items);
for (i = 0; i < ARRAY_SIZE(bench_scopes); i++)
run_bench(n_threads, bench_scopes[i], bench_scopes[i]);
destroy_workqueue(bench_wq);
/* Return -EAGAIN so the module doesn't stay loaded after the benchmark */
return -EAGAIN;
}
module_init(test_workqueue_init);
MODULE_AUTHOR("Breno Leitao <leitao@debian.org>");
MODULE_DESCRIPTION("Stress/performance benchmark for workqueue subsystem");
MODULE_LICENSE("GPL");

View File

@@ -107,6 +107,7 @@ WQ_MEM_RECLAIM = prog['WQ_MEM_RECLAIM']
WQ_AFFN_CPU = prog['WQ_AFFN_CPU']
WQ_AFFN_SMT = prog['WQ_AFFN_SMT']
WQ_AFFN_CACHE = prog['WQ_AFFN_CACHE']
WQ_AFFN_CACHE_SHARD = prog['WQ_AFFN_CACHE_SHARD']
WQ_AFFN_NUMA = prog['WQ_AFFN_NUMA']
WQ_AFFN_SYSTEM = prog['WQ_AFFN_SYSTEM']
@@ -138,7 +139,7 @@ def print_pod_type(pt):
print(f' [{cpu}]={pt.cpu_pod[cpu].value_()}', end='')
print('')
for affn in [WQ_AFFN_CPU, WQ_AFFN_SMT, WQ_AFFN_CACHE, WQ_AFFN_NUMA, WQ_AFFN_SYSTEM]:
for affn in [WQ_AFFN_CPU, WQ_AFFN_SMT, WQ_AFFN_CACHE, WQ_AFFN_CACHE_SHARD, WQ_AFFN_NUMA, WQ_AFFN_SYSTEM]:
print('')
print(f'{wq_affn_names[affn].string_().decode().upper()}{" (default)" if affn == wq_affn_dfl else ""}')
print_pod_type(wq_pod_types[affn])
@@ -227,15 +228,10 @@ if 'node_to_cpumask_map' in prog:
print(f'NODE[{node:02}]={cpumask_str(node_to_cpumask_map[node])}')
print('')
print(f'[{"workqueue":^{WQ_NAME_LEN-2}}\\ min max', end='')
first = True
print(f'[{"workqueue":^{WQ_NAME_LEN-1}} {"min":>4} {"max":>4}', end='')
for node in for_each_node():
if first:
print(f' NODE {node}', end='')
first = False
else:
print(f' {node:7}', end='')
print(f' {"dfl":>7} ]')
print(f' {"NODE " + str(node):>9}', end='')
print(f' {"dfl":>9} ]')
print('')
for wq in list_for_each_entry('struct workqueue_struct', workqueues.address_of_(), 'list'):
@@ -243,11 +239,11 @@ if 'node_to_cpumask_map' in prog:
continue
print(f'{wq.name.string_().decode():{WQ_NAME_LEN}} ', end='')
print(f'{wq.min_active.value_():3} {wq.max_active.value_():3}', end='')
print(f'{wq.min_active.value_():4} {wq.max_active.value_():4}', end='')
for node in for_each_node():
nna = wq.node_nr_active[node]
print(f' {nna.nr.counter.value_():3}/{nna.max.value_():3}', end='')
print(f' {f"{nna.nr.counter.value_()}/{nna.max.value_()}":>9}', end='')
nna = wq.node_nr_active[nr_node_ids]
print(f' {nna.nr.counter.value_():3}/{nna.max.value_():3}')
print(f' {f"{nna.nr.counter.value_()}/{nna.max.value_()}":>9}')
else:
printf(f'node_to_cpumask_map not present, is NUMA enabled?')