[Openmp-commits] [openmp] [OpenMP] Adding a throttling threshold to bound dependent tasking mem… (PR #82274)
PEREIRA Romain via Openmp-commits
openmp-commits at lists.llvm.org
Mon Feb 19 11:27:42 PST 2024
https://github.com/rpereira-dev created https://github.com/llvm/llvm-project/pull/82274
Please refer to https://reviews.llvm.org/D158416
>From 459dfd35d47fbb0a1a7f2f0408febd18eb745f1b Mon Sep 17 00:00:00 2001
From: Romain Pereira <romain.pereira at inria.fr>
Date: Mon, 19 Feb 2024 20:21:51 +0100
Subject: [PATCH] [OpenMP] Adding a throttling threshold to bound dependent
tasking memory footprint
---
openmp/runtime/src/kmp.h | 4 ++
openmp/runtime/src/kmp_global.cpp | 13 +++-
openmp/runtime/src/kmp_settings.cpp | 37 ++++++++++-
openmp/runtime/src/kmp_tasking.cpp | 53 ++++++++++------
.../runtime/test/tasking/omp_throttling_max.c | 62 +++++++++++++++++++
.../omp_throttling_max_ready_per_thread.c | 62 +++++++++++++++++++
6 files changed, 211 insertions(+), 20 deletions(-)
create mode 100644 openmp/runtime/test/tasking/omp_throttling_max.c
create mode 100644 openmp/runtime/test/tasking/omp_throttling_max_ready_per_thread.c
diff --git a/openmp/runtime/src/kmp.h b/openmp/runtime/src/kmp.h
index 259c57b5afbca5..5409004a7e9d53 100644
--- a/openmp/runtime/src/kmp.h
+++ b/openmp/runtime/src/kmp.h
@@ -2422,7 +2422,11 @@ typedef enum kmp_tasking_mode {
extern kmp_tasking_mode_t
__kmp_tasking_mode; /* determines how/when to execute tasks */
extern int __kmp_task_stealing_constraint;
+extern std::atomic<kmp_int32> __kmp_n_tasks_in_flight;
extern int __kmp_enable_task_throttling;
+extern kmp_int32 __kmp_task_maximum;
+extern kmp_int32 __kmp_task_maximum_ready_per_thread;
+
extern kmp_int32 __kmp_default_device; // Set via OMP_DEFAULT_DEVICE if
// specified, defaults to 0 otherwise
// Set via OMP_MAX_TASK_PRIORITY if specified, defaults to 0 otherwise
diff --git a/openmp/runtime/src/kmp_global.cpp b/openmp/runtime/src/kmp_global.cpp
index 5017cd3de4be57..6dc9ac2d175246 100644
--- a/openmp/runtime/src/kmp_global.cpp
+++ b/openmp/runtime/src/kmp_global.cpp
@@ -353,8 +353,19 @@ omp_memspace_handle_t const llvm_omp_target_device_mem_space =
KMP_BUILD_ASSERT(sizeof(kmp_tasking_flags_t) == 4);
int __kmp_task_stealing_constraint = 1; /* Constrain task stealing by default */
-int __kmp_enable_task_throttling = 1;
+std::atomic<kmp_int32> __kmp_n_tasks_in_flight = 0; /* n° of tasks in flight */
+
+kmp_int32 __kmp_enable_task_throttling = 1; /* Serialize tasks once a threshold
+ is reached, such as the number of
+ ready tasks or the total number of
+ tasks */
+
+kmp_int32 __kmp_task_maximum = 65536; /* number of tasks threshold before
+ serializing */
+
+kmp_int32 __kmp_task_maximum_ready_per_thread = 256; /* number of ready tasks
+ before serializing */
#ifdef DEBUG_SUSPEND
int __kmp_suspend_count = 0;
#endif
diff --git a/openmp/runtime/src/kmp_settings.cpp b/openmp/runtime/src/kmp_settings.cpp
index ec86ee07472c1e..8491da4a3371f2 100644
--- a/openmp/runtime/src/kmp_settings.cpp
+++ b/openmp/runtime/src/kmp_settings.cpp
@@ -5360,6 +5360,33 @@ static void __kmp_stg_print_task_throttling(kmp_str_buf_t *buffer,
__kmp_stg_print_bool(buffer, name, __kmp_enable_task_throttling);
} // __kmp_stg_print_task_throttling
+// -----------------------------------------------------------------------------
+// KMP_TASK_MAXIMUM
+static void __kmp_stg_parse_task_maximum(char const *name, char const *value,
+ void *data) {
+ __kmp_stg_parse_int(name, value, 1, INT_MAX, &__kmp_task_maximum);
+} // __kmp_stg_parse_task_maximum
+
+static void __kmp_stg_print_task_maximum(kmp_str_buf_t *buffer,
+ char const *name, void *data) {
+ __kmp_stg_print_int(buffer, name, __kmp_task_maximum);
+} // __kmp_stg_print_task_maximum
+
+// -----------------------------------------------------------------------------
+// KMP_TASK_MAXIMUM_READY_PER_THREAD
+static void __kmp_stg_parse_task_maximum_ready_per_thread(char const *name,
+ char const *value,
+ void *data) {
+ __kmp_stg_parse_int(name, value, 1, INT_MAX,
+ &__kmp_task_maximum_ready_per_thread);
+} // __kmp_stg_parse_task_maximum_ready_per_thread
+
+static void __kmp_stg_print_task_maximum_ready_per_thread(kmp_str_buf_t *buffer,
+ char const *name,
+ void *data) {
+ __kmp_stg_print_int(buffer, name, __kmp_task_maximum_ready_per_thread);
+} // __kmp_stg_print_task_maximum_ready_per_thread
+
#if KMP_HAVE_MWAIT || KMP_HAVE_UMWAIT
// -----------------------------------------------------------------------------
// KMP_USER_LEVEL_MWAIT
@@ -5750,6 +5777,13 @@ static kmp_setting_t __kmp_stg_table[] = {
{"KMP_ENABLE_TASK_THROTTLING", __kmp_stg_parse_task_throttling,
__kmp_stg_print_task_throttling, NULL, 0, 0},
+ {"KMP_TASK_MAXIMUM", __kmp_stg_parse_task_maximum,
+ __kmp_stg_print_task_maximum, NULL, 0, 0},
+
+ {"KMP_TASK_MAXIMUM_READY_PER_THREAD",
+ __kmp_stg_parse_task_maximum_ready_per_thread,
+ __kmp_stg_print_task_maximum_ready_per_thread, NULL, 0, 0},
+
{"OMP_DISPLAY_ENV", __kmp_stg_parse_omp_display_env,
__kmp_stg_print_omp_display_env, NULL, 0, 0},
{"OMP_CANCELLATION", __kmp_stg_parse_omp_cancellation,
@@ -5764,7 +5798,8 @@ static kmp_setting_t __kmp_stg_table[] = {
#if OMPX_TASKGRAPH
{"KMP_MAX_TDGS", __kmp_stg_parse_max_tdgs, __kmp_std_print_max_tdgs, NULL,
0, 0},
- {"KMP_TDG_DOT", __kmp_stg_parse_tdg_dot, __kmp_stg_print_tdg_dot, NULL, 0, 0},
+ {"KMP_TDG_DOT", __kmp_stg_parse_tdg_dot, __kmp_stg_print_tdg_dot, NULL, 0,
+ 0},
#endif
#if OMPT_SUPPORT
diff --git a/openmp/runtime/src/kmp_tasking.cpp b/openmp/runtime/src/kmp_tasking.cpp
index 6e8b948efa064f..9cfb0486fc71da 100644
--- a/openmp/runtime/src/kmp_tasking.cpp
+++ b/openmp/runtime/src/kmp_tasking.cpp
@@ -438,10 +438,9 @@ static kmp_int32 __kmp_push_priority_task(kmp_int32 gtid, kmp_info_t *thread,
__kmp_acquire_bootstrap_lock(&thread_data->td.td_deque_lock);
// Check if deque is full
- if (TCR_4(thread_data->td.td_deque_ntasks) >=
- TASK_DEQUE_SIZE(thread_data->td)) {
- if (__kmp_enable_task_throttling &&
- __kmp_task_is_allowed(gtid, __kmp_task_stealing_constraint, taskdata,
+ if (__kmp_enable_task_throttling && TCR_4(thread_data->td.td_deque_ntasks) >=
+ __kmp_task_maximum_ready_per_thread) {
+ if (__kmp_task_is_allowed(gtid, __kmp_task_stealing_constraint, taskdata,
thread->th.th_current_task)) {
__kmp_release_bootstrap_lock(&thread_data->td.td_deque_lock);
KA_TRACE(20, ("__kmp_push_priority_task: T#%d deque is full; returning "
@@ -543,40 +542,51 @@ static kmp_int32 __kmp_push_task(kmp_int32 gtid, kmp_task_t *task) {
int locked = 0;
// Check if deque is full
- if (TCR_4(thread_data->td.td_deque_ntasks) >=
- TASK_DEQUE_SIZE(thread_data->td)) {
- if (__kmp_enable_task_throttling &&
+ int requires_resize = TCR_4(thread_data->td.td_deque_ntasks) >=
+ TASK_DEQUE_SIZE(thread_data->td);
+ int requires_throttling =
+ __kmp_enable_task_throttling && TCR_4(thread_data->td.td_deque_ntasks) >=
+ __kmp_task_maximum_ready_per_thread;
+ int thread_can_execute;
+ if (requires_resize || requires_throttling) {
+ thread_can_execute =
__kmp_task_is_allowed(gtid, __kmp_task_stealing_constraint, taskdata,
- thread->th.th_current_task)) {
+ thread->th.th_current_task);
+ if (requires_throttling && thread_can_execute) {
KA_TRACE(20, ("__kmp_push_task: T#%d deque is full; returning "
"TASK_NOT_PUSHED for task %p\n",
gtid, taskdata));
return TASK_NOT_PUSHED;
- } else {
+ } else { /* maybe requires_resize */
__kmp_acquire_bootstrap_lock(&thread_data->td.td_deque_lock);
locked = 1;
- if (TCR_4(thread_data->td.td_deque_ntasks) >=
- TASK_DEQUE_SIZE(thread_data->td)) {
- // expand deque to push the task which is not allowed to execute
+ requires_resize = TCR_4(thread_data->td.td_deque_ntasks) >=
+ TASK_DEQUE_SIZE(thread_data->td);
+ // expand deque to push the task which is not allowed to execute
+ if (requires_resize)
__kmp_realloc_task_deque(thread, thread_data);
- }
}
}
// Lock the deque for the task push operation
if (!locked) {
__kmp_acquire_bootstrap_lock(&thread_data->td.td_deque_lock);
// Need to recheck as we can get a proxy task from thread outside of OpenMP
- if (TCR_4(thread_data->td.td_deque_ntasks) >=
- TASK_DEQUE_SIZE(thread_data->td)) {
- if (__kmp_enable_task_throttling &&
+ requires_resize = TCR_4(thread_data->td.td_deque_ntasks) >=
+ TASK_DEQUE_SIZE(thread_data->td);
+ requires_throttling = __kmp_enable_task_throttling &&
+ TCR_4(thread_data->td.td_deque_ntasks) >=
+ __kmp_task_maximum_ready_per_thread;
+ if (requires_resize || requires_throttling) {
+ thread_can_execute =
__kmp_task_is_allowed(gtid, __kmp_task_stealing_constraint, taskdata,
- thread->th.th_current_task)) {
+ thread->th.th_current_task);
+ if (requires_throttling && thread_can_execute) {
__kmp_release_bootstrap_lock(&thread_data->td.td_deque_lock);
KA_TRACE(20, ("__kmp_push_task: T#%d deque is full on 2nd check; "
"returning TASK_NOT_PUSHED for task %p\n",
gtid, taskdata));
return TASK_NOT_PUSHED;
- } else {
+ } else { /* requires_resize */
// expand deque to push the task which is not allowed to execute
__kmp_realloc_task_deque(thread, thread_data);
}
@@ -914,6 +924,7 @@ static void __kmp_free_task(kmp_int32 gtid, kmp_taskdata_t *taskdata,
#else /* ! USE_FAST_MEMORY */
__kmp_thread_free(thread, taskdata);
#endif
+ --__kmp_n_tasks_in_flight;
#if OMPX_TASKGRAPH
} else {
taskdata->td_flags.complete = 0;
@@ -1464,6 +1475,11 @@ kmp_task_t *__kmp_task_alloc(ident_t *loc_ref, kmp_int32 gtid,
if (UNLIKELY(!TCR_4(__kmp_init_middle)))
__kmp_middle_initialize();
+ // task throttling: to many tasks co-existing, emptying queue now
+ if (__kmp_enable_task_throttling)
+ while (TCR_4(__kmp_n_tasks_in_flight.load()) >= __kmp_task_maximum)
+ __kmpc_omp_taskyield(NULL, gtid, 0);
+
if (flags->hidden_helper) {
if (__kmp_enable_hidden_helper) {
if (!TCR_4(__kmp_init_hidden_helper))
@@ -1558,6 +1574,7 @@ kmp_task_t *__kmp_task_alloc(ident_t *loc_ref, kmp_int32 gtid,
taskdata = (kmp_taskdata_t *)__kmp_thread_malloc(thread, shareds_offset +
sizeof_shareds);
#endif /* USE_FAST_MEMORY */
+ ++__kmp_n_tasks_in_flight;
task = KMP_TASKDATA_TO_TASK(taskdata);
diff --git a/openmp/runtime/test/tasking/omp_throttling_max.c b/openmp/runtime/test/tasking/omp_throttling_max.c
new file mode 100644
index 00000000000000..582927c713fd34
--- /dev/null
+++ b/openmp/runtime/test/tasking/omp_throttling_max.c
@@ -0,0 +1,62 @@
+// RUN: %libomp-compile && env OMP_NUM_THREADS=2 KMP_ENABLE_TASK_THROTTLING=1 KMP_TASK_MAXIMUM=0 %libomp-run
+// RUN: %libomp-compile && env OMP_NUM_THREADS=2 KMP_ENABLE_TASK_THROTTLING=1 KMP_TASK_MAXIMUM=1 %libomp-run
+// RUN: %libomp-compile && env OMP_NUM_THREADS=2 KMP_ENABLE_TASK_THROTTLING=1 KMP_TASK_MAXIMUM=256 %libomp-run
+// RUN: %libomp-compile && env OMP_NUM_THREADS=2 KMP_ENABLE_TASK_THROTTLING=1 KMP_TASK_MAXIMUM=65536 %libomp-run
+// RUN: %libomp-compile && env OMP_NUM_THREADS=2 KMP_ENABLE_TASK_THROTTLING=1 KMP_TASK_MAXIMUM=100000 %libomp-run
+
+/**
+ * This test ensures that task throttling on the maximum number of tasks
+ * threshold works properly.
+ *
+ * It creates 2 threads (1 producer, 1 consummer)
+ * The producer infinitely create tasks 'T_i' until one executed
+ * The consumer is blocked until the producer starts throttling
+ * Executing any 'T_i' unblocks the consumer and stop the producer
+ *
+ * The assertion tests ensures that the producer does not create more than the
+ * total number of tasks provided by the programmer
+ */
+
+#include <assert.h>
+#include <omp.h>
+#include <stdlib.h>
+
+/* default value */
+#define MAX_TASKS_DEFAULT (65536)
+
+int main(void) {
+ /* maximum number of tasks in-flight */
+ char *max_tasks_str = getenv("KMP_TASK_MAXIMUM");
+ int max_tasks = max_tasks_str ? atoi(max_tasks_str) : MAX_TASKS_DEFAULT;
+ if (max_tasks <= 0)
+ max_tasks = 1;
+
+ /* check if throttling is enabled (it is by default) */
+ char *throttling_str = getenv("KMP_ENABLE_TASK_THROTTLING");
+ int throttling = throttling_str ? *throttling_str == '1' : 1;
+ assert(throttling);
+
+ volatile int done = 0;
+
+/* testing KMP_TASK_MAXIMUM */
+#pragma omp parallel num_threads(2) default(none) \
+ shared(max_tasks, throttling, done)
+ {
+ if (omp_get_thread_num() == 1)
+ while (!done)
+ ;
+
+#pragma omp master
+ {
+ int ntasks = 0;
+ while (!done) {
+#pragma omp task default(none) shared(done) depend(out : max_tasks, throttling)
+ done = 1;
+
+ assert(++ntasks <= max_tasks + 1);
+ }
+ }
+ }
+
+ return 0;
+}
diff --git a/openmp/runtime/test/tasking/omp_throttling_max_ready_per_thread.c b/openmp/runtime/test/tasking/omp_throttling_max_ready_per_thread.c
new file mode 100644
index 00000000000000..6d801971d7af19
--- /dev/null
+++ b/openmp/runtime/test/tasking/omp_throttling_max_ready_per_thread.c
@@ -0,0 +1,62 @@
+// RUN: %libomp-compile && env OMP_NUM_THREADS=2 KMP_ENABLE_TASK_THROTTLING=1 KMP_TASK_MAXIMUM_READY_PER_THREAD=0 %libomp-run
+// RUN: %libomp-compile && env OMP_NUM_THREADS=2 KMP_ENABLE_TASK_THROTTLING=1 KMP_TASK_MAXIMUM_READY_PER_THREAD=1 %libomp-run
+// RUN: %libomp-compile && env OMP_NUM_THREADS=2 KMP_ENABLE_TASK_THROTTLING=1 KMP_TASK_MAXIMUM_READY_PER_THREAD=256 %libomp-run
+// RUN: %libomp-compile && env OMP_NUM_THREADS=2 KMP_ENABLE_TASK_THROTTLING=1 KMP_TASK_MAXIMUM_READY_PER_THREAD=65536 %libomp-run
+// RUN: %libomp-compile && env OMP_NUM_THREADS=2 KMP_ENABLE_TASK_THROTTLING=1 KMP_TASK_MAXIMUM_READY_PER_THREAD=100000 %libomp-run
+
+/**
+ * This test ensures that task throttling on the maximum number of ready tasks
+ * per thread threshold works properly.
+ *
+ * It creates 2 threads (1 producer, 1 consummer)
+ * The producer infinitely create tasks 'T_i' until one executed
+ * The consumer is blocked until the producer starts throttling
+ * Executing any 'T_i' unblocks the consumer and stop the producer
+ *
+ * The assertion tests ensures that the producer does not create more than the
+ * total number of tasks provided by the programmer
+ */
+
+#include <assert.h>
+#include <omp.h>
+#include <stdlib.h>
+#include <stdio.h>
+
+#define MAX_TASKS_READY_DEFAULT (1 << 8)
+
+int main(void) {
+ /* maximum number of ready tasks in-flight */
+ char *max_tasks_ready_str = getenv("KMP_TASK_MAXIMUM_READY_PER_THREAD");
+ int max_tasks_ready =
+ max_tasks_ready_str ? atoi(max_tasks_ready_str) : MAX_TASKS_READY_DEFAULT;
+ if (max_tasks_ready <= 0)
+ max_tasks_ready = 1;
+
+ /* check if throttling is enabled (it is by default) */
+ char *throttling_str = getenv("KMP_ENABLE_TASK_THROTTLING");
+ int throttling = throttling_str ? *throttling_str == '1' : 1;
+
+ volatile int done = 0;
+
+/* testing KMP_TASK_MAXIMUM_READY */
+#pragma omp parallel num_threads(2) default(none) \
+ shared(max_tasks_ready, throttling, done)
+ {
+ if (omp_get_thread_num() == 1)
+ while (!done)
+ ;
+
+#pragma omp master
+ {
+ int ntasks = 0;
+ while (!done) {
+#pragma omp task default(none) shared(done)
+ done = 1;
+
+ assert(++ntasks <= max_tasks_ready + 1);
+ }
+ }
+ }
+
+ return 0;
+}
More information about the Openmp-commits
mailing list