[Openmp-commits] [openmp] [OpenMP] Adding a throttling threshold to bound dependent tasking mem… (PR #82274)

PEREIRA Romain via Openmp-commits openmp-commits at lists.llvm.org
Mon Feb 19 11:27:42 PST 2024


https://github.com/rpereira-dev created https://github.com/llvm/llvm-project/pull/82274

Please refer to https://reviews.llvm.org/D158416


>From 459dfd35d47fbb0a1a7f2f0408febd18eb745f1b Mon Sep 17 00:00:00 2001
From: Romain Pereira <romain.pereira at inria.fr>
Date: Mon, 19 Feb 2024 20:21:51 +0100
Subject: [PATCH] [OpenMP] Adding a throttling threshold to bound dependent
 tasking memory footprint

---
 openmp/runtime/src/kmp.h                      |  4 ++
 openmp/runtime/src/kmp_global.cpp             | 13 +++-
 openmp/runtime/src/kmp_settings.cpp           | 37 ++++++++++-
 openmp/runtime/src/kmp_tasking.cpp            | 53 ++++++++++------
 .../runtime/test/tasking/omp_throttling_max.c | 62 +++++++++++++++++++
 .../omp_throttling_max_ready_per_thread.c     | 62 +++++++++++++++++++
 6 files changed, 211 insertions(+), 20 deletions(-)
 create mode 100644 openmp/runtime/test/tasking/omp_throttling_max.c
 create mode 100644 openmp/runtime/test/tasking/omp_throttling_max_ready_per_thread.c

diff --git a/openmp/runtime/src/kmp.h b/openmp/runtime/src/kmp.h
index 259c57b5afbca5..5409004a7e9d53 100644
--- a/openmp/runtime/src/kmp.h
+++ b/openmp/runtime/src/kmp.h
@@ -2422,7 +2422,11 @@ typedef enum kmp_tasking_mode {
 extern kmp_tasking_mode_t
     __kmp_tasking_mode; /* determines how/when to execute tasks */
 extern int __kmp_task_stealing_constraint;
+extern std::atomic<kmp_int32> __kmp_n_tasks_in_flight;
 extern int __kmp_enable_task_throttling;
+extern kmp_int32 __kmp_task_maximum;
+extern kmp_int32 __kmp_task_maximum_ready_per_thread;
+
 extern kmp_int32 __kmp_default_device; // Set via OMP_DEFAULT_DEVICE if
 // specified, defaults to 0 otherwise
 // Set via OMP_MAX_TASK_PRIORITY if specified, defaults to 0 otherwise
diff --git a/openmp/runtime/src/kmp_global.cpp b/openmp/runtime/src/kmp_global.cpp
index 5017cd3de4be57..6dc9ac2d175246 100644
--- a/openmp/runtime/src/kmp_global.cpp
+++ b/openmp/runtime/src/kmp_global.cpp
@@ -353,8 +353,19 @@ omp_memspace_handle_t const llvm_omp_target_device_mem_space =
 KMP_BUILD_ASSERT(sizeof(kmp_tasking_flags_t) == 4);
 
 int __kmp_task_stealing_constraint = 1; /* Constrain task stealing by default */
-int __kmp_enable_task_throttling = 1;
 
+std::atomic<kmp_int32> __kmp_n_tasks_in_flight = 0; /* n° of tasks in flight */
+
+kmp_int32 __kmp_enable_task_throttling = 1; /* Serialize tasks once a threshold
+                                            is reached, such as the number of
+                                            ready tasks or the total number of
+                                            tasks */
+
+kmp_int32 __kmp_task_maximum = 65536; /* number of tasks threshold before
+                                         serializing */
+
+kmp_int32 __kmp_task_maximum_ready_per_thread = 256; /* number of ready tasks
+                                                        before serializing */
 #ifdef DEBUG_SUSPEND
 int __kmp_suspend_count = 0;
 #endif
diff --git a/openmp/runtime/src/kmp_settings.cpp b/openmp/runtime/src/kmp_settings.cpp
index ec86ee07472c1e..8491da4a3371f2 100644
--- a/openmp/runtime/src/kmp_settings.cpp
+++ b/openmp/runtime/src/kmp_settings.cpp
@@ -5360,6 +5360,33 @@ static void __kmp_stg_print_task_throttling(kmp_str_buf_t *buffer,
   __kmp_stg_print_bool(buffer, name, __kmp_enable_task_throttling);
 } // __kmp_stg_print_task_throttling
 
+// -----------------------------------------------------------------------------
+// KMP_TASK_MAXIMUM
+static void __kmp_stg_parse_task_maximum(char const *name, char const *value,
+                                         void *data) {
+  __kmp_stg_parse_int(name, value, 1, INT_MAX, &__kmp_task_maximum);
+} // __kmp_stg_parse_task_maximum
+
+static void __kmp_stg_print_task_maximum(kmp_str_buf_t *buffer,
+                                         char const *name, void *data) {
+  __kmp_stg_print_int(buffer, name, __kmp_task_maximum);
+} // __kmp_stg_print_task_maximum
+
+// -----------------------------------------------------------------------------
+// KMP_TASK_MAXIMUM_READY_PER_THREAD
+static void __kmp_stg_parse_task_maximum_ready_per_thread(char const *name,
+                                                          char const *value,
+                                                          void *data) {
+  __kmp_stg_parse_int(name, value, 1, INT_MAX,
+                      &__kmp_task_maximum_ready_per_thread);
+} // __kmp_stg_parse_task_maximum_ready_per_thread
+
+static void __kmp_stg_print_task_maximum_ready_per_thread(kmp_str_buf_t *buffer,
+                                                          char const *name,
+                                                          void *data) {
+  __kmp_stg_print_int(buffer, name, __kmp_task_maximum_ready_per_thread);
+} // __kmp_stg_print_task_maximum_ready_per_thread
+
 #if KMP_HAVE_MWAIT || KMP_HAVE_UMWAIT
 // -----------------------------------------------------------------------------
 // KMP_USER_LEVEL_MWAIT
@@ -5750,6 +5777,13 @@ static kmp_setting_t __kmp_stg_table[] = {
     {"KMP_ENABLE_TASK_THROTTLING", __kmp_stg_parse_task_throttling,
      __kmp_stg_print_task_throttling, NULL, 0, 0},
 
+    {"KMP_TASK_MAXIMUM", __kmp_stg_parse_task_maximum,
+     __kmp_stg_print_task_maximum, NULL, 0, 0},
+
+    {"KMP_TASK_MAXIMUM_READY_PER_THREAD",
+     __kmp_stg_parse_task_maximum_ready_per_thread,
+     __kmp_stg_print_task_maximum_ready_per_thread, NULL, 0, 0},
+
     {"OMP_DISPLAY_ENV", __kmp_stg_parse_omp_display_env,
      __kmp_stg_print_omp_display_env, NULL, 0, 0},
     {"OMP_CANCELLATION", __kmp_stg_parse_omp_cancellation,
@@ -5764,7 +5798,8 @@ static kmp_setting_t __kmp_stg_table[] = {
 #if OMPX_TASKGRAPH
     {"KMP_MAX_TDGS", __kmp_stg_parse_max_tdgs, __kmp_std_print_max_tdgs, NULL,
      0, 0},
-    {"KMP_TDG_DOT", __kmp_stg_parse_tdg_dot, __kmp_stg_print_tdg_dot, NULL, 0, 0},
+    {"KMP_TDG_DOT", __kmp_stg_parse_tdg_dot, __kmp_stg_print_tdg_dot, NULL, 0,
+     0},
 #endif
 
 #if OMPT_SUPPORT
diff --git a/openmp/runtime/src/kmp_tasking.cpp b/openmp/runtime/src/kmp_tasking.cpp
index 6e8b948efa064f..9cfb0486fc71da 100644
--- a/openmp/runtime/src/kmp_tasking.cpp
+++ b/openmp/runtime/src/kmp_tasking.cpp
@@ -438,10 +438,9 @@ static kmp_int32 __kmp_push_priority_task(kmp_int32 gtid, kmp_info_t *thread,
 
   __kmp_acquire_bootstrap_lock(&thread_data->td.td_deque_lock);
   // Check if deque is full
-  if (TCR_4(thread_data->td.td_deque_ntasks) >=
-      TASK_DEQUE_SIZE(thread_data->td)) {
-    if (__kmp_enable_task_throttling &&
-        __kmp_task_is_allowed(gtid, __kmp_task_stealing_constraint, taskdata,
+  if (__kmp_enable_task_throttling && TCR_4(thread_data->td.td_deque_ntasks) >=
+                                          __kmp_task_maximum_ready_per_thread) {
+    if (__kmp_task_is_allowed(gtid, __kmp_task_stealing_constraint, taskdata,
                               thread->th.th_current_task)) {
       __kmp_release_bootstrap_lock(&thread_data->td.td_deque_lock);
       KA_TRACE(20, ("__kmp_push_priority_task: T#%d deque is full; returning "
@@ -543,40 +542,51 @@ static kmp_int32 __kmp_push_task(kmp_int32 gtid, kmp_task_t *task) {
 
   int locked = 0;
   // Check if deque is full
-  if (TCR_4(thread_data->td.td_deque_ntasks) >=
-      TASK_DEQUE_SIZE(thread_data->td)) {
-    if (__kmp_enable_task_throttling &&
+  int requires_resize = TCR_4(thread_data->td.td_deque_ntasks) >=
+                        TASK_DEQUE_SIZE(thread_data->td);
+  int requires_throttling =
+      __kmp_enable_task_throttling && TCR_4(thread_data->td.td_deque_ntasks) >=
+                                          __kmp_task_maximum_ready_per_thread;
+  int thread_can_execute;
+  if (requires_resize || requires_throttling) {
+    thread_can_execute =
         __kmp_task_is_allowed(gtid, __kmp_task_stealing_constraint, taskdata,
-                              thread->th.th_current_task)) {
+                              thread->th.th_current_task);
+    if (requires_throttling && thread_can_execute) {
       KA_TRACE(20, ("__kmp_push_task: T#%d deque is full; returning "
                     "TASK_NOT_PUSHED for task %p\n",
                     gtid, taskdata));
       return TASK_NOT_PUSHED;
-    } else {
+    } else { /* maybe requires_resize */
       __kmp_acquire_bootstrap_lock(&thread_data->td.td_deque_lock);
       locked = 1;
-      if (TCR_4(thread_data->td.td_deque_ntasks) >=
-          TASK_DEQUE_SIZE(thread_data->td)) {
-        // expand deque to push the task which is not allowed to execute
+      requires_resize = TCR_4(thread_data->td.td_deque_ntasks) >=
+                        TASK_DEQUE_SIZE(thread_data->td);
+      // expand deque to push the task which is not allowed to execute
+      if (requires_resize)
         __kmp_realloc_task_deque(thread, thread_data);
-      }
     }
   }
   // Lock the deque for the task push operation
   if (!locked) {
     __kmp_acquire_bootstrap_lock(&thread_data->td.td_deque_lock);
     // Need to recheck as we can get a proxy task from thread outside of OpenMP
-    if (TCR_4(thread_data->td.td_deque_ntasks) >=
-        TASK_DEQUE_SIZE(thread_data->td)) {
-      if (__kmp_enable_task_throttling &&
+    requires_resize = TCR_4(thread_data->td.td_deque_ntasks) >=
+                      TASK_DEQUE_SIZE(thread_data->td);
+    requires_throttling = __kmp_enable_task_throttling &&
+                          TCR_4(thread_data->td.td_deque_ntasks) >=
+                              __kmp_task_maximum_ready_per_thread;
+    if (requires_resize || requires_throttling) {
+      thread_can_execute =
           __kmp_task_is_allowed(gtid, __kmp_task_stealing_constraint, taskdata,
-                                thread->th.th_current_task)) {
+                                thread->th.th_current_task);
+      if (requires_throttling && thread_can_execute) {
         __kmp_release_bootstrap_lock(&thread_data->td.td_deque_lock);
         KA_TRACE(20, ("__kmp_push_task: T#%d deque is full on 2nd check; "
                       "returning TASK_NOT_PUSHED for task %p\n",
                       gtid, taskdata));
         return TASK_NOT_PUSHED;
-      } else {
+      } else { /* requires_resize */
         // expand deque to push the task which is not allowed to execute
         __kmp_realloc_task_deque(thread, thread_data);
       }
@@ -914,6 +924,7 @@ static void __kmp_free_task(kmp_int32 gtid, kmp_taskdata_t *taskdata,
 #else /* ! USE_FAST_MEMORY */
   __kmp_thread_free(thread, taskdata);
 #endif
+  --__kmp_n_tasks_in_flight;
 #if OMPX_TASKGRAPH
   } else {
     taskdata->td_flags.complete = 0;
@@ -1464,6 +1475,11 @@ kmp_task_t *__kmp_task_alloc(ident_t *loc_ref, kmp_int32 gtid,
   if (UNLIKELY(!TCR_4(__kmp_init_middle)))
     __kmp_middle_initialize();
 
+  // task throttling: to many tasks co-existing, emptying queue now
+  if (__kmp_enable_task_throttling)
+    while (TCR_4(__kmp_n_tasks_in_flight.load()) >= __kmp_task_maximum)
+      __kmpc_omp_taskyield(NULL, gtid, 0);
+
   if (flags->hidden_helper) {
     if (__kmp_enable_hidden_helper) {
       if (!TCR_4(__kmp_init_hidden_helper))
@@ -1558,6 +1574,7 @@ kmp_task_t *__kmp_task_alloc(ident_t *loc_ref, kmp_int32 gtid,
   taskdata = (kmp_taskdata_t *)__kmp_thread_malloc(thread, shareds_offset +
                                                                sizeof_shareds);
 #endif /* USE_FAST_MEMORY */
+  ++__kmp_n_tasks_in_flight;
 
   task = KMP_TASKDATA_TO_TASK(taskdata);
 
diff --git a/openmp/runtime/test/tasking/omp_throttling_max.c b/openmp/runtime/test/tasking/omp_throttling_max.c
new file mode 100644
index 00000000000000..582927c713fd34
--- /dev/null
+++ b/openmp/runtime/test/tasking/omp_throttling_max.c
@@ -0,0 +1,62 @@
+// RUN: %libomp-compile && env OMP_NUM_THREADS=2 KMP_ENABLE_TASK_THROTTLING=1 KMP_TASK_MAXIMUM=0      %libomp-run
+// RUN: %libomp-compile && env OMP_NUM_THREADS=2 KMP_ENABLE_TASK_THROTTLING=1 KMP_TASK_MAXIMUM=1      %libomp-run
+// RUN: %libomp-compile && env OMP_NUM_THREADS=2 KMP_ENABLE_TASK_THROTTLING=1 KMP_TASK_MAXIMUM=256    %libomp-run
+// RUN: %libomp-compile && env OMP_NUM_THREADS=2 KMP_ENABLE_TASK_THROTTLING=1 KMP_TASK_MAXIMUM=65536  %libomp-run
+// RUN: %libomp-compile && env OMP_NUM_THREADS=2 KMP_ENABLE_TASK_THROTTLING=1 KMP_TASK_MAXIMUM=100000 %libomp-run
+
+/**
+ *  This test ensures that task throttling on the maximum number of tasks
+ *  threshold works properly.
+ *
+ *  It creates 2 threads (1 producer, 1 consummer)
+ *  The producer infinitely create tasks 'T_i' until one executed
+ *  The consumer is blocked until the producer starts throttling
+ *  Executing any 'T_i' unblocks the consumer and stop the producer
+ *
+ *  The assertion tests ensures that the producer does not create more than the
+ *  total number of tasks provided by the programmer
+ */
+
+#include <assert.h>
+#include <omp.h>
+#include <stdlib.h>
+
+/* default value */
+#define MAX_TASKS_DEFAULT (65536)
+
+int main(void) {
+  /* maximum number of tasks in-flight */
+  char *max_tasks_str = getenv("KMP_TASK_MAXIMUM");
+  int max_tasks = max_tasks_str ? atoi(max_tasks_str) : MAX_TASKS_DEFAULT;
+  if (max_tasks <= 0)
+    max_tasks = 1;
+
+  /* check if throttling is enabled (it is by default) */
+  char *throttling_str = getenv("KMP_ENABLE_TASK_THROTTLING");
+  int throttling = throttling_str ? *throttling_str == '1' : 1;
+  assert(throttling);
+
+  volatile int done = 0;
+
+/* testing KMP_TASK_MAXIMUM */
+#pragma omp parallel num_threads(2) default(none)                              \
+    shared(max_tasks, throttling, done)
+  {
+    if (omp_get_thread_num() == 1)
+      while (!done)
+        ;
+
+#pragma omp master
+    {
+      int ntasks = 0;
+      while (!done) {
+#pragma omp task default(none) shared(done) depend(out : max_tasks, throttling)
+        done = 1;
+
+        assert(++ntasks <= max_tasks + 1);
+      }
+    }
+  }
+
+  return 0;
+}
diff --git a/openmp/runtime/test/tasking/omp_throttling_max_ready_per_thread.c b/openmp/runtime/test/tasking/omp_throttling_max_ready_per_thread.c
new file mode 100644
index 00000000000000..6d801971d7af19
--- /dev/null
+++ b/openmp/runtime/test/tasking/omp_throttling_max_ready_per_thread.c
@@ -0,0 +1,62 @@
+// RUN: %libomp-compile && env OMP_NUM_THREADS=2 KMP_ENABLE_TASK_THROTTLING=1 KMP_TASK_MAXIMUM_READY_PER_THREAD=0      %libomp-run
+// RUN: %libomp-compile && env OMP_NUM_THREADS=2 KMP_ENABLE_TASK_THROTTLING=1 KMP_TASK_MAXIMUM_READY_PER_THREAD=1      %libomp-run
+// RUN: %libomp-compile && env OMP_NUM_THREADS=2 KMP_ENABLE_TASK_THROTTLING=1 KMP_TASK_MAXIMUM_READY_PER_THREAD=256    %libomp-run
+// RUN: %libomp-compile && env OMP_NUM_THREADS=2 KMP_ENABLE_TASK_THROTTLING=1 KMP_TASK_MAXIMUM_READY_PER_THREAD=65536  %libomp-run
+// RUN: %libomp-compile && env OMP_NUM_THREADS=2 KMP_ENABLE_TASK_THROTTLING=1 KMP_TASK_MAXIMUM_READY_PER_THREAD=100000 %libomp-run
+
+/**
+ *  This test ensures that task throttling on the maximum number of ready tasks
+ *  per thread threshold works properly.
+ *
+ *  It creates 2 threads (1 producer, 1 consummer)
+ *  The producer infinitely create tasks 'T_i' until one executed
+ *  The consumer is blocked until the producer starts throttling
+ *  Executing any 'T_i' unblocks the consumer and stop the producer
+ *
+ *  The assertion tests ensures that the producer does not create more than the
+ *  total number of tasks provided by the programmer
+ */
+
+#include <assert.h>
+#include <omp.h>
+#include <stdlib.h>
+#include <stdio.h>
+
+#define MAX_TASKS_READY_DEFAULT (1 << 8)
+
+int main(void) {
+  /* maximum number of ready tasks in-flight */
+  char *max_tasks_ready_str = getenv("KMP_TASK_MAXIMUM_READY_PER_THREAD");
+  int max_tasks_ready =
+      max_tasks_ready_str ? atoi(max_tasks_ready_str) : MAX_TASKS_READY_DEFAULT;
+  if (max_tasks_ready <= 0)
+    max_tasks_ready = 1;
+
+  /* check if throttling is enabled (it is by default) */
+  char *throttling_str = getenv("KMP_ENABLE_TASK_THROTTLING");
+  int throttling = throttling_str ? *throttling_str == '1' : 1;
+
+  volatile int done = 0;
+
+/* testing KMP_TASK_MAXIMUM_READY */
+#pragma omp parallel num_threads(2) default(none)                              \
+    shared(max_tasks_ready, throttling, done)
+  {
+    if (omp_get_thread_num() == 1)
+      while (!done)
+        ;
+
+#pragma omp master
+    {
+      int ntasks = 0;
+      while (!done) {
+#pragma omp task default(none) shared(done)
+        done = 1;
+
+        assert(++ntasks <= max_tasks_ready + 1);
+      }
+    }
+  }
+
+  return 0;
+}



More information about the Openmp-commits mailing list