[Openmp-commits] [openmp] 08d8f1a - [OpenMP][Tools] Cleanup memory pool used in Archer
Joachim Protze via Openmp-commits
openmp-commits at lists.llvm.org
Wed Jun 9 04:38:32 PDT 2021
Author: Joachim Protze
Date: 2021-06-09T13:36:19+02:00
New Revision: 08d8f1a958bd8be681e3e1f346be80818a83a556
URL: https://github.com/llvm/llvm-project/commit/08d8f1a958bd8be681e3e1f346be80818a83a556
DIFF: https://github.com/llvm/llvm-project/commit/08d8f1a958bd8be681e3e1f346be80818a83a556.diff
LOG: [OpenMP][Tools] Cleanup memory pool used in Archer
The main motivation for reusing objects is that it helps to avoid creating and
leaking synchronization clocks in TSan. The reused object will reuse the
synchronization clock in TSan.
Before, new and delete operators were overloaded to get and return memory for
the object from/to the object pool.
This patch replaces the operator overloading with explicit static New/Delete
functions.
Objects for parallel regions and implicit tasks will always be recruited and
returned to the thread-local object pool. Only for explicit task, there is a
chance that an other thread completes the task and will free the object. This
patch optimizes the thread-local New/Delete calls by avoiding locks and only
lock if the pool is empty. Remote threads return the object into a separate
queue.
The chunk size for allocations is now decided based on page size. The objects
will also be aligned to cache lines avoiding false sharing.
This is the first patch in a series to provide better tasking support.
Differential Revision: https://reviews.llvm.org/D103606
Added:
Modified:
openmp/tools/archer/README.md
openmp/tools/archer/ompt-tsan.cpp
openmp/tools/archer/tests/lit.cfg
Removed:
################################################################################
diff --git a/openmp/tools/archer/README.md b/openmp/tools/archer/README.md
index 2852fe061d203..0b02c638607e0 100644
--- a/openmp/tools/archer/README.md
+++ b/openmp/tools/archer/README.md
@@ -131,6 +131,15 @@ statement in main!)</td>
</tr>
</tbody>
+<tbody>
+<tr>
+<td class="org-left">report_data_leak</td>
+<td class="org-right">0</td>
+<td class="org-left">Report leaking OMPT data for execution under
+Archer. Used for testing and debugging Archer if errors occur.</td>
+</tr>
+</tbody>
+
<tbody>
<tr>
<td class="org-left">verbose</td>
diff --git a/openmp/tools/archer/ompt-tsan.cpp b/openmp/tools/archer/ompt-tsan.cpp
index 02df8e0c986a0..8d682d48c5688 100644
--- a/openmp/tools/archer/ompt-tsan.cpp
+++ b/openmp/tools/archer/ompt-tsan.cpp
@@ -24,8 +24,9 @@
#include <list>
#include <mutex>
#include <sstream>
-#include <stack>
#include <string>
+#include <sys/resource.h>
+#include <unistd.h>
#include <unordered_map>
#include <vector>
@@ -34,7 +35,6 @@
#endif
#include "omp-tools.h"
-#include <sys/resource.h>
// Define attribute that indicates that the fall through from the previous
// case label is intentional and should not be diagnosed by a compiler
@@ -61,6 +61,7 @@ class ArcherFlags {
int print_max_rss{0};
int verbose{0};
int enabled{1};
+ int report_data_leak{0};
int ignore_serial{0};
ArcherFlags(const char *env) {
@@ -82,6 +83,8 @@ class ArcherFlags {
continue;
if (sscanf(it->c_str(), "verbose=%d", &verbose))
continue;
+ if (sscanf(it->c_str(), "report_data_leak=%d", &report_data_leak))
+ continue;
if (sscanf(it->c_str(), "enable=%d", &enabled))
continue;
if (sscanf(it->c_str(), "ignore_serial=%d", &ignore_serial))
@@ -214,7 +217,7 @@ void __attribute__((weak)) __tsan_func_exit(void) {}
static ompt_get_parallel_info_t ompt_get_parallel_info;
static ompt_get_thread_data_t ompt_get_thread_data;
-typedef uint64_t ompt_tsan_clockid;
+typedef char ompt_tsan_clockid;
static uint64_t my_next_id() {
static uint64_t ID = 0;
@@ -222,100 +225,132 @@ static uint64_t my_next_id() {
return ret;
}
+static int pagesize{0};
+
// Data structure to provide a threadsafe pool of reusable objects.
-// DataPool<Type of objects, Size of blockalloc>
-template <typename T, int N> struct DataPool {
- std::mutex DPMutex;
- std::stack<T *> DataPointer;
+// DataPool<Type of objects>
+template <typename T> struct DataPool final {
+ static __thread DataPool<T> *ThreadDataPool;
+ std::mutex DPMutex{};
+
+ // store unused objects
+ std::vector<T *> DataPointer{};
+ std::vector<T *> RemoteDataPointer{};
+
+ // store all allocated memory to finally release
std::list<void *> memory;
- int total;
+ // count remotely returned data (RemoteDataPointer.size())
+ std::atomic<int> remote{0};
+
+ // totally allocated data objects in pool
+ int total{0};
+#ifdef DEBUG_DATA
+ int remoteReturn{0};
+ int localReturn{0};
+
+ int getRemote() { return remoteReturn + remote; }
+ int getLocal() { return localReturn; }
+#endif
+ int getTotal() { return total; }
+ int getMissing() {
+ return total - DataPointer.size() - RemoteDataPointer.size();
+ }
+
+ // fill the pool by allocating a page of memory
void newDatas() {
- // prefix the Data with a pointer to 'this', allows to return memory to
- // 'this',
- // without explicitly knowing the source.
- //
- // To reduce lock contention, we use thread local DataPools, but Data
- // objects move to other threads.
- // The strategy is to get objects from local pool. Only if the object moved
- // to another
- // thread, we might see a penalty on release (returnData).
- // For "single producer" pattern, a single thread creates tasks, these are
- // executed by other threads.
- // The master will have a high demand on TaskData, so return after use.
- struct pooldata {
- DataPool<T, N> *dp;
- T data;
- };
- // We alloc without initialize the memory. We cannot call constructors.
- // Therefore use malloc!
- pooldata *datas = (pooldata *)malloc(sizeof(pooldata) * N);
+ if (remote > 0) {
+ const std::lock_guard<std::mutex> lock(DPMutex);
+ // DataPointer is empty, so just swap the vectors
+ DataPointer.swap(RemoteDataPointer);
+ remote = 0;
+ return;
+ }
+ // calculate size of an object including padding to cacheline size
+ size_t elemSize = sizeof(T);
+ size_t paddedSize = (((elemSize - 1) / 64) + 1) * 64;
+ // number of padded elements to allocate
+ int ndatas = pagesize / paddedSize;
+ char *datas = (char *)malloc(ndatas * paddedSize);
memory.push_back(datas);
- for (int i = 0; i < N; i++) {
- datas[i].dp = this;
- DataPointer.push(&(datas[i].data));
+ for (int i = 0; i < ndatas; i++) {
+ DataPointer.push_back(new (datas + i * paddedSize) T(this));
}
- total += N;
+ total += ndatas;
}
+ // get data from the pool
T *getData() {
T *ret;
- DPMutex.lock();
if (DataPointer.empty())
newDatas();
- ret = DataPointer.top();
- DataPointer.pop();
- DPMutex.unlock();
+ ret = DataPointer.back();
+ DataPointer.pop_back();
return ret;
}
- void returnData(T *data) {
- DPMutex.lock();
- DataPointer.push(data);
- DPMutex.unlock();
- }
-
- void getDatas(int n, T **datas) {
- DPMutex.lock();
- for (int i = 0; i < n; i++) {
- if (DataPointer.empty())
- newDatas();
- datas[i] = DataPointer.top();
- DataPointer.pop();
- }
- DPMutex.unlock();
+ // accesses to the thread-local datapool don't need locks
+ void returnOwnData(T *data) {
+ DataPointer.emplace_back(data);
+#ifdef DEBUG_DATA
+ localReturn++;
+#endif
}
- void returnDatas(int n, T **datas) {
- DPMutex.lock();
- for (int i = 0; i < n; i++) {
- DataPointer.push(datas[i]);
- }
- DPMutex.unlock();
+ // returning to a remote datapool using lock
+ void returnData(T *data) {
+ const std::lock_guard<std::mutex> lock(DPMutex);
+ RemoteDataPointer.emplace_back(data);
+ remote++;
+#ifdef DEBUG_DATA
+ remoteReturn++;
+#endif
}
- DataPool() : DPMutex(), DataPointer(), total(0) {}
-
~DataPool() {
// we assume all memory is returned when the thread finished / destructor is
// called
+ if (archer_flags->report_data_leak && getMissing() != 0) {
+ printf("ERROR: While freeing DataPool (%s) we are missing %i data "
+ "objects.\n",
+ __PRETTY_FUNCTION__, getMissing());
+ exit(-3);
+ }
+ for (auto i : DataPointer)
+ if (i)
+ i->~T();
+ for (auto i : RemoteDataPointer)
+ if (i)
+ i->~T();
for (auto i : memory)
if (i)
free(i);
}
};
-// This function takes care to return the data to the originating DataPool
-// A pointer to the originating DataPool is stored just before the actual data.
-template <typename T, int N> static void retData(void *data) {
- ((DataPool<T, N> **)data)[-1]->returnData((T *)data);
-}
+template <typename T> struct DataPoolEntry {
+ DataPool<T> *owner;
+
+ static T *New() { return DataPool<T>::ThreadDataPool->getData(); }
+
+ void Delete() {
+ static_cast<T *>(this)->Reset();
+ if (owner == DataPool<T>::ThreadDataPool)
+ owner->returnOwnData(static_cast<T *>(this));
+ else
+ owner->returnData(static_cast<T *>(this));
+ }
+
+ DataPoolEntry(DataPool<T> *dp) : owner(dp) {}
+};
struct ParallelData;
-__thread DataPool<ParallelData, 4> *pdp;
+typedef DataPool<ParallelData> ParallelDataPool;
+template <>
+__thread ParallelDataPool *ParallelDataPool::ThreadDataPool = nullptr;
/// Data structure to store additional information for parallel regions.
-struct ParallelData {
+struct ParallelData final : DataPoolEntry<ParallelData> {
// Parallel fork is just another barrier, use Barrier[1]
@@ -328,14 +363,18 @@ struct ParallelData {
void *GetBarrierPtr(unsigned Index) { return &(Barrier[Index]); }
- ParallelData(const void *codeptr) : codePtr(codeptr) {}
- ~ParallelData() {
- TsanDeleteClock(&(Barrier[0]));
- TsanDeleteClock(&(Barrier[1]));
+ ParallelData *Init(const void *codeptr) {
+ codePtr = codeptr;
+ return this;
}
- // overload new/delete to use DataPool for memory management.
- void *operator new(size_t size) { return pdp->getData(); }
- void operator delete(void *p, size_t) { retData<ParallelData, 4>(p); }
+
+ void Reset() {}
+
+ static ParallelData *New(const void *codeptr) {
+ return DataPoolEntry<ParallelData>::New()->Init(codeptr);
+ }
+
+ ParallelData(DataPool<ParallelData> *dp) : DataPoolEntry<ParallelData>(dp) {}
};
static inline ParallelData *ToParallelData(ompt_data_t *parallel_data) {
@@ -343,95 +382,83 @@ static inline ParallelData *ToParallelData(ompt_data_t *parallel_data) {
}
struct Taskgroup;
-__thread DataPool<Taskgroup, 4> *tgp;
+typedef DataPool<Taskgroup> TaskgroupPool;
+template <> __thread TaskgroupPool *TaskgroupPool::ThreadDataPool = nullptr;
/// Data structure to support stacking of taskgroups and allow synchronization.
-struct Taskgroup {
+struct Taskgroup final : DataPoolEntry<Taskgroup> {
/// Its address is used for relationships of the taskgroup's task set.
ompt_tsan_clockid Ptr;
/// Reference to the parent taskgroup.
Taskgroup *Parent;
- Taskgroup(Taskgroup *Parent) : Parent(Parent) {}
- ~Taskgroup() { TsanDeleteClock(&Ptr); }
-
void *GetPtr() { return &Ptr; }
- // overload new/delete to use DataPool for memory management.
- void *operator new(size_t size) { return tgp->getData(); }
- void operator delete(void *p, size_t) { retData<Taskgroup, 4>(p); }
+
+ Taskgroup *Init(Taskgroup *parent) {
+ Parent = parent;
+ return this;
+ }
+
+ void Reset() {}
+
+ static Taskgroup *New(Taskgroup *Parent) {
+ return DataPoolEntry<Taskgroup>::New()->Init(Parent);
+ }
+
+ Taskgroup(DataPool<Taskgroup> *dp) : DataPoolEntry<Taskgroup>(dp) {}
};
struct TaskData;
-__thread DataPool<TaskData, 4> *tdp;
+typedef DataPool<TaskData> TaskDataPool;
+template <> __thread TaskDataPool *TaskDataPool::ThreadDataPool = nullptr;
/// Data structure to store additional information for tasks.
-struct TaskData {
+struct TaskData final : DataPoolEntry<TaskData> {
/// Its address is used for relationships of this task.
- ompt_tsan_clockid Task;
+ ompt_tsan_clockid Task{0};
/// Child tasks use its address to declare a relationship to a taskwait in
/// this task.
- ompt_tsan_clockid Taskwait;
+ ompt_tsan_clockid Taskwait{0};
/// Whether this task is currently executing a barrier.
- bool InBarrier;
+ bool InBarrier{false};
/// Whether this task is an included task.
int TaskType{0};
+ /// count execution phase
+ int execution{0};
+
/// Index of which barrier to use next.
- char BarrierIndex;
+ char BarrierIndex{0};
/// Count how often this structure has been put into child tasks + 1.
- std::atomic_int RefCount;
+ std::atomic_int RefCount{1};
/// Reference to the parent that created this task.
- TaskData *Parent;
+ TaskData *Parent{nullptr};
/// Reference to the implicit task in the stack above this task.
- TaskData *ImplicitTask;
+ TaskData *ImplicitTask{nullptr};
/// Reference to the team of this task.
- ParallelData *Team;
+ ParallelData *Team{nullptr};
/// Reference to the current taskgroup that this task either belongs to or
/// that it just created.
- Taskgroup *TaskGroup;
+ Taskgroup *TaskGroup{nullptr};
/// Dependency information for this task.
- ompt_dependence_t *Dependencies;
+ ompt_dependence_t *Dependencies{nullptr};
/// Number of dependency entries.
- unsigned DependencyCount;
-
- void *PrivateData;
- size_t PrivateDataSize;
-
- int execution;
- int freed;
+ unsigned DependencyCount{0};
- TaskData(TaskData *Parent, int taskType)
- : InBarrier(false), TaskType(taskType), BarrierIndex(0), RefCount(1),
- Parent(Parent), ImplicitTask(nullptr), Team(Parent->Team),
- TaskGroup(nullptr), DependencyCount(0), execution(0), freed(0) {
- if (Parent != nullptr) {
- Parent->RefCount++;
- // Copy over pointer to taskgroup. This task may set up its own stack
- // but for now belongs to its parent's taskgroup.
- TaskGroup = Parent->TaskGroup;
- }
- }
-
- TaskData(ParallelData *Team, int taskType)
- : InBarrier(false), TaskType(taskType), BarrierIndex(0), RefCount(1),
- Parent(nullptr), ImplicitTask(this), Team(Team), TaskGroup(nullptr),
- DependencyCount(0), execution(1), freed(0) {}
-
- ~TaskData() {
- TsanDeleteClock(&Task);
- TsanDeleteClock(&Taskwait);
- }
+#ifdef DEBUG
+ int freed{0};
+#endif
bool isIncluded() { return TaskType & ompt_task_undeferred; }
bool isUntied() { return TaskType & ompt_task_untied; }
@@ -447,9 +474,54 @@ struct TaskData {
void *GetTaskPtr() { return &Task; }
void *GetTaskwaitPtr() { return &Taskwait; }
- // overload new/delete to use DataPool for memory management.
- void *operator new(size_t size) { return tdp->getData(); }
- void operator delete(void *p, size_t) { retData<TaskData, 4>(p); }
+
+ TaskData *Init(TaskData *parent, int taskType) {
+ TaskType = taskType;
+ Parent = parent;
+ Team = Parent->Team;
+ if (Parent != nullptr) {
+ Parent->RefCount++;
+ // Copy over pointer to taskgroup. This task may set up its own stack
+ // but for now belongs to its parent's taskgroup.
+ TaskGroup = Parent->TaskGroup;
+ }
+ return this;
+ }
+
+ TaskData *Init(ParallelData *team, int taskType) {
+ TaskType = taskType;
+ execution = 1;
+ ImplicitTask = this;
+ Team = team;
+ return this;
+ }
+
+ void Reset() {
+ InBarrier = false;
+ TaskType = 0;
+ execution = 0;
+ BarrierIndex = 0;
+ RefCount = 1;
+ Parent = nullptr;
+ ImplicitTask = nullptr;
+ Team = nullptr;
+ TaskGroup = nullptr;
+ Dependencies = nullptr;
+ DependencyCount = 0;
+#ifdef DEBUG
+ freed = 0;
+#endif
+ }
+
+ static TaskData *New(TaskData *parent, int taskType) {
+ return DataPoolEntry<TaskData>::New()->Init(parent, taskType);
+ }
+
+ static TaskData *New(ParallelData *team, int taskType) {
+ return DataPoolEntry<TaskData>::New()->Init(team, taskType);
+ }
+
+ TaskData(DataPool<TaskData> *dp) : DataPoolEntry<TaskData>(dp) {}
};
static inline TaskData *ToTaskData(ompt_data_t *task_data) {
@@ -470,19 +542,22 @@ std::mutex LocksMutex;
static void ompt_tsan_thread_begin(ompt_thread_t thread_type,
ompt_data_t *thread_data) {
- pdp = new DataPool<ParallelData, 4>;
- TsanNewMemory(pdp, sizeof(pdp));
- tgp = new DataPool<Taskgroup, 4>;
- TsanNewMemory(tgp, sizeof(tgp));
- tdp = new DataPool<TaskData, 4>;
- TsanNewMemory(tdp, sizeof(tdp));
+ ParallelDataPool::ThreadDataPool = new ParallelDataPool;
+ TsanNewMemory(ParallelDataPool::ThreadDataPool,
+ sizeof(ParallelDataPool::ThreadDataPool));
+ TaskgroupPool::ThreadDataPool = new TaskgroupPool;
+ TsanNewMemory(TaskgroupPool::ThreadDataPool,
+ sizeof(TaskgroupPool::ThreadDataPool));
+ TaskDataPool::ThreadDataPool = new TaskDataPool;
+ TsanNewMemory(TaskDataPool::ThreadDataPool,
+ sizeof(TaskDataPool::ThreadDataPool));
thread_data->value = my_next_id();
}
static void ompt_tsan_thread_end(ompt_data_t *thread_data) {
- delete pdp;
- delete tgp;
- delete tdp;
+ delete ParallelDataPool::ThreadDataPool;
+ delete TaskgroupPool::ThreadDataPool;
+ delete TaskDataPool::ThreadDataPool;
}
/// OMPT event callbacks for handling parallel regions.
@@ -492,7 +567,7 @@ static void ompt_tsan_parallel_begin(ompt_data_t *parent_task_data,
ompt_data_t *parallel_data,
uint32_t requested_team_size, int flag,
const void *codeptr_ra) {
- ParallelData *Data = new ParallelData(codeptr_ra);
+ ParallelData *Data = ParallelData::New(codeptr_ra);
parallel_data->ptr = Data;
TsanHappensBefore(Data->GetParallelPtr());
@@ -509,7 +584,7 @@ static void ompt_tsan_parallel_end(ompt_data_t *parallel_data,
TsanHappensAfter(Data->GetBarrierPtr(0));
TsanHappensAfter(Data->GetBarrierPtr(1));
- delete Data;
+ Data->Delete();
#if (LLVM_VERSION >= 40)
if (&__archer_get_omp_status) {
@@ -527,19 +602,24 @@ static void ompt_tsan_implicit_task(ompt_scope_endpoint_t endpoint,
switch (endpoint) {
case ompt_scope_begin:
if (type & ompt_task_initial) {
- parallel_data->ptr = new ParallelData(nullptr);
+ parallel_data->ptr = ParallelData::New(nullptr);
}
- task_data->ptr = new TaskData(ToParallelData(parallel_data), type);
+ task_data->ptr = TaskData::New(ToParallelData(parallel_data), type);
TsanHappensAfter(ToParallelData(parallel_data)->GetParallelPtr());
TsanFuncEntry(ToParallelData(parallel_data)->codePtr);
break;
case ompt_scope_end: {
TaskData *Data = ToTaskData(task_data);
+#ifdef DEBUG
assert(Data->freed == 0 && "Implicit task end should only be called once!");
Data->freed = 1;
+#endif
assert(Data->RefCount == 1 &&
"All tasks should have finished at the implicit barrier!");
- delete Data;
+ Data->Delete();
+ if (type & ompt_task_initial) {
+ ToParallelData(parallel_data)->Delete();
+ }
TsanFuncExit();
break;
}
@@ -588,7 +668,7 @@ static void ompt_tsan_sync_region(ompt_sync_region_t kind,
break;
case ompt_sync_region_taskgroup:
- Data->TaskGroup = new Taskgroup(Data->TaskGroup);
+ Data->TaskGroup = Taskgroup::New(Data->TaskGroup);
break;
case ompt_sync_region_reduction:
@@ -643,7 +723,7 @@ static void ompt_tsan_sync_region(ompt_sync_region_t kind,
// Delete this allocated taskgroup, all descendent task are finished by
// now.
Taskgroup *Parent = Data->TaskGroup->Parent;
- delete Data->TaskGroup;
+ Data->TaskGroup->Delete();
Data->TaskGroup = Parent;
break;
}
@@ -705,16 +785,16 @@ static void ompt_tsan_task_create(
ompt_data_t *parallel_data;
int team_size = 1;
ompt_get_parallel_info(0, ¶llel_data, &team_size);
- ParallelData *PData = new ParallelData(nullptr);
+ ParallelData *PData = ParallelData::New(nullptr);
parallel_data->ptr = PData;
- Data = new TaskData(PData, type);
+ Data = TaskData::New(PData, type);
new_task_data->ptr = Data;
} else if (type & ompt_task_undeferred) {
- Data = new TaskData(ToTaskData(parent_task_data), type);
+ Data = TaskData::New(ToTaskData(parent_task_data), type);
new_task_data->ptr = Data;
} else if (type & ompt_task_explicit || type & ompt_task_target) {
- Data = new TaskData(ToTaskData(parent_task_data), type);
+ Data = TaskData::New(ToTaskData(parent_task_data), type);
new_task_data->ptr = Data;
// Use the newly created address. We cannot use a single address from the
@@ -731,7 +811,7 @@ static void __ompt_tsan_release_task(TaskData *task) {
if (task->DependencyCount > 0) {
delete[] task->Dependencies;
}
- delete task;
+ task->Delete();
task = Parent;
}
}
@@ -1006,6 +1086,8 @@ ompt_start_tool(unsigned int omp_version, const char *runtime_version) {
return NULL;
}
+ pagesize = getpagesize();
+
static ompt_start_tool_result_t ompt_start_tool_result = {
&ompt_tsan_initialize, &ompt_tsan_finalize, {0}};
diff --git a/openmp/tools/archer/tests/lit.cfg b/openmp/tools/archer/tests/lit.cfg
index 3a67cac0b219e..7396e4b0d5d38 100644
--- a/openmp/tools/archer/tests/lit.cfg
+++ b/openmp/tools/archer/tests/lit.cfg
@@ -90,9 +90,12 @@ if config.has_tsan == True:
if 'INTEL_LICENSE_FILE' in os.environ:
config.environment['INTEL_LICENSE_FILE'] = os.environ['INTEL_LICENSE_FILE']
+config.environment['ARCHER_OPTIONS'] = "report_data_leak=1"
+
# Race Tests
config.substitutions.append(("%libarcher-compile-and-run-race-noserial", \
- "%libarcher-compile && env ARCHER_OPTIONS=ignore_serial=1 %libarcher-run-race"))
+ "%%libarcher-compile && env ARCHER_OPTIONS=\"ignore_serial=1 %s\" %%libarcher-run-race" \
+ % config.environment['ARCHER_OPTIONS']))
config.substitutions.append(("%libarcher-compile-and-run-race", \
"%libarcher-compile && %libarcher-run-race"))
config.substitutions.append(("%libarcher-compile-and-run-nosuppression", \
More information about the Openmp-commits
mailing list