Skip to content
This repository was archived by the owner on Nov 17, 2023. It is now read-only.

Commit ff4efd9

Browse files
committed
Refactor LibraryInitializer so it's thread safe.
Fixes #13438 Fixes #14979
1 parent 3112893 commit ff4efd9

File tree

6 files changed

+201
-58
lines changed

6 files changed

+201
-58
lines changed

docs/faq/env_var.md

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,9 @@ $env:MXNET_STORAGE_FALLBACK_LOG_VERBOSE=0
3939

4040
## Set the Number of Threads
4141

42+
* MXNET_OMP_MAX_THREADS
43+
- Values: Int ```(default=Number of processors / Number of processors * 2 in X86)```
44+
- Maximum number of threads to use in individual operators through OpenMP. If not set, OMP_NUM_THREADS is considered after.
4245
* MXNET_GPU_WORKER_NTHREADS
4346
- Values: Int ```(default=2)```
4447
- The maximum number of threads to use on each GPU. This parameter is used to parallelize the computation within a single GPU card.
@@ -47,7 +50,7 @@ $env:MXNET_STORAGE_FALLBACK_LOG_VERBOSE=0
4750
- The maximum number of concurrent threads that do the memory copy job on each GPU.
4851
* MXNET_CPU_WORKER_NTHREADS
4952
- Values: Int ```(default=1)```
50-
- The maximum number of scheduling threads on CPU. It specifies how many operators can be run in parallel. Note that most CPU operators are parallelized by OpenMP. To change the number of threads used by individual operators, please set `OMP_NUM_THREADS` instead.
53+
- The maximum number of scheduling threads on CPU. It specifies how many operators can be run in parallel. Note that most CPU operators are parallelized by OpenMP. To change the number of threads used by individual operators, please set `MXNET_OMP_MAX_THREADS` instead.
5154
* MXNET_CPU_PRIORITY_NTHREADS
5255
- Values: Int ```(default=4)```
5356
- The number of threads given to prioritized CPU jobs.
@@ -56,10 +59,13 @@ $env:MXNET_STORAGE_FALLBACK_LOG_VERBOSE=0
5659
- The number of threads used for NNPACK. NNPACK package aims to provide high-performance implementations of some layers for multi-core CPUs. Checkout [NNPACK](http://mxnet.io/faq/nnpack.html) to know more about it.
5760
* MXNET_MP_WORKER_NTHREADS
5861
- Values: Int ```(default=1)```
59-
- The number of scheduling threads on CPU given to multiprocess workers. Enlarge this number allows more operators to run in parallel in individual workers but please consider reducing the overall `num_workers` to avoid thread contention (not available on Windows).
62+
- The number of scheduling threads on CPU given to multiprocess workers (after fork). Enlarge this number allows more operators to run in parallel in individual workers but please consider reducing the overall `num_workers` to avoid thread contention (not available on Windows).
6063
* MXNET_MP_OPENCV_NUM_THREADS
6164
- Values: Int ```(default=0)```
6265
- The number of OpenCV execution threads given to multiprocess workers. OpenCV multithreading is disabled if `MXNET_MP_OPENCV_NUM_THREADS` < 1 (default). Enlarge this number may boost the performance of individual workers when executing underlying OpenCV functions but please consider reducing the overall `num_workers` to avoid thread contention (not available on Windows).
66+
* MXNET_GPU_COPY_NTHREADS
67+
- Values:: Int ```(default=2)```
68+
- Number of threads for copying data from CPU to GPU.
6369

6470
## Memory Options
6571

src/common/utils.h

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,9 +50,22 @@
5050
#include "../operator/nn/mkldnn/mkldnn_base-inl.h"
5151
#endif
5252

53+
#if defined(_WIN32) || defined(_WIN64) || defined(__WINDOWS__)
54+
#include <windows.h>
55+
#else
56+
#include <unistd.h>
57+
#include <cstdint>
58+
#endif
59+
60+
5361
namespace mxnet {
5462
namespace common {
5563

64+
#if defined(_WIN32) || defined(_WIN64) || defined(__WINDOWS__)
65+
inline size_t current_process_id() { return ::GetCurrentProcessId(); }
66+
#else
67+
inline size_t current_process_id() { return getpid(); }
68+
#endif
5669
/*!
5770
* \brief IndPtr should be non-negative, in non-decreasing order, start with 0
5871
* and end with value equal with size of indices.

src/engine/threaded_engine_perdevice.cc

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
#include <dmlc/parameter.h>
2929
#include <dmlc/concurrency.h>
3030
#include <dmlc/thread_group.h>
31+
#include <initialize.h>
3132
#include "./threaded_engine.h"
3233
#include "./thread_pool.h"
3334
#include "../common/lazy_alloc_array.h"
@@ -76,7 +77,8 @@ class ThreadedEnginePerDevice : public ThreadedEngine {
7677
void Start() override {
7778
if (is_worker_) return;
7879
gpu_worker_nthreads_ = common::GetNumThreadsPerGPU();
79-
cpu_worker_nthreads_ = dmlc::GetEnv("MXNET_CPU_WORKER_NTHREADS", 1);
80+
// MXNET_CPU_WORKER_NTHREADS
81+
cpu_worker_nthreads_ = LibraryInitializer::Get()->cpu_worker_nthreads_;
8082
gpu_copy_nthreads_ = dmlc::GetEnv("MXNET_GPU_COPY_NTHREADS", 2);
8183
// create CPU task
8284
int cpu_priority_nthreads = dmlc::GetEnv("MXNET_CPU_PRIORITY_NTHREADS", 4);

src/initialize.cc

Lines changed: 82 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
* \file initialize.cc
2323
* \brief initialize mxnet library
2424
*/
25+
#include "initialize.h"
2526
#include <signal.h>
2627
#include <dmlc/logging.h>
2728
#include <mxnet/engine.h>
@@ -30,8 +31,11 @@
3031
#if MXNET_USE_OPENCV
3132
#include <opencv2/opencv.hpp>
3233
#endif // MXNET_USE_OPENCV
34+
#include "common/utils.h"
35+
#include "engine/openmp.h"
3336

3437
namespace mxnet {
38+
3539
#if MXNET_USE_SIGNAL_HANDLER && DMLC_LOG_STACK_TRACE
3640
static void SegfaultLogger(int sig) {
3741
fprintf(stderr, "\nSegmentation fault: %d\n\n", sig);
@@ -40,58 +44,93 @@ static void SegfaultLogger(int sig) {
4044
}
4145
#endif
4246

43-
class LibraryInitializer {
44-
public:
45-
LibraryInitializer() {
46-
dmlc::InitLogging("mxnet");
47-
#if MXNET_USE_SIGNAL_HANDLER && DMLC_LOG_STACK_TRACE
48-
struct sigaction sa;
49-
sigaction(SIGSEGV, nullptr, &sa);
50-
if (sa.sa_handler == nullptr) {
51-
signal(SIGSEGV, SegfaultLogger);
52-
}
53-
#endif
47+
// pthread_atfork handlers, delegated to LibraryInitializer members.
5448

55-
// disable openmp for multithreaded workers
56-
#ifndef _WIN32
57-
using op::custom::CustomOperator;
58-
pthread_atfork(
59-
[]() {
60-
CustomOperator::Get()->Stop();
61-
Engine::Get()->Stop();
62-
},
63-
[]() {
64-
Engine::Get()->Start();
65-
CustomOperator::Get()->Start();
66-
},
67-
[]() {
68-
// Conservative thread management for multiprocess workers
69-
const size_t mp_worker_threads = dmlc::GetEnv("MXNET_MP_WORKER_NTHREADS", 1);
70-
dmlc::SetEnv("MXNET_CPU_WORKER_NTHREADS", mp_worker_threads);
71-
dmlc::SetEnv("OMP_NUM_THREADS", 1);
49+
void pthread_atfork_prepare() {
50+
LibraryInitializer* library_initializer = LibraryInitializer::Get();
51+
library_initializer->atfork_prepare();
52+
}
53+
54+
void pthread_atfork_parent() {
55+
LibraryInitializer* library_initializer = LibraryInitializer::Get();
56+
library_initializer->atfork_parent();
57+
}
58+
59+
void pthread_atfork_child() {
60+
LibraryInitializer* library_initializer = LibraryInitializer::Get();
61+
library_initializer->atfork_child();
62+
}
63+
64+
// LibraryInitializer member functions
65+
66+
LibraryInitializer::LibraryInitializer()
67+
: original_pid_(common::current_process_id()),
68+
mp_worker_nthreads_(dmlc::GetEnv("MXNET_MP_WORKER_NTHREADS", 1)),
69+
cpu_worker_nthreads_(dmlc::GetEnv("MXNET_CPU_WORKER_NTHREADS", 1)),
70+
mp_cv_num_threads_(dmlc::GetEnv("MXNET_MP_OPENCV_NUM_THREADS", 0))
71+
{
72+
dmlc::InitLogging("mxnet");
73+
engine::OpenMP::Get(); // force OpenMP initialization
74+
install_signal_handlers();
75+
install_pthread_atfork_handlers();
76+
}
77+
78+
bool LibraryInitializer::was_forked() const {
79+
return common::current_process_id() != original_pid_;
80+
}
81+
82+
void LibraryInitializer::atfork_prepare() {
83+
using op::custom::CustomOperator;
84+
CustomOperator::Get()->Stop();
85+
Engine::Get()->Stop();
86+
}
87+
88+
void LibraryInitializer::atfork_parent() {
89+
using op::custom::CustomOperator;
90+
Engine::Get()->Start();
91+
CustomOperator::Get()->Start();
92+
}
93+
94+
void LibraryInitializer::atfork_child() {
95+
using op::custom::CustomOperator;
96+
// Conservative thread management for multiprocess workers
97+
this->cpu_worker_nthreads_ = this->mp_cv_num_threads_;
7298
#if MXNET_USE_OPENCV && !__APPLE__
73-
const size_t mp_cv_num_threads = dmlc::GetEnv("MXNET_MP_OPENCV_NUM_THREADS", 0);
74-
cv::setNumThreads(mp_cv_num_threads); // disable opencv threading
99+
cv::setNumThreads(mp_cv_num_threads_);
75100
#endif // MXNET_USE_OPENCV
76-
engine::OpenMP::Get()->set_enabled(false);
77-
Engine::Get()->Start();
78-
CustomOperator::Get()->Start();
79-
});
80-
#endif
81-
}
101+
engine::OpenMP::Get()->set_thread_max(1);
102+
engine::OpenMP::Get()->set_enabled(false);
103+
Engine::Get()->Start();
104+
CustomOperator::Get()->Start();
105+
}
82106

83-
static LibraryInitializer* Get();
84-
};
107+
void LibraryInitializer::install_pthread_atfork_handlers() {
108+
#ifndef _WIN32
109+
pthread_atfork(pthread_atfork_prepare, pthread_atfork_parent, pthread_atfork_child);
110+
#endif
111+
}
85112

86-
LibraryInitializer* LibraryInitializer::Get() {
87-
static LibraryInitializer inst;
88-
return &inst;
113+
void LibraryInitializer::install_signal_handlers() {
114+
#if MXNET_USE_SIGNAL_HANDLER && DMLC_LOG_STACK_TRACE
115+
struct sigaction sa;
116+
sigaction(SIGSEGV, nullptr, &sa);
117+
if (sa.sa_handler == nullptr) {
118+
signal(SIGSEGV, SegfaultLogger);
119+
}
120+
#endif
89121
}
90122

123+
/**
124+
* Perform static initialization
125+
*/
91126
#ifdef __GNUC__
92-
// Don't print an unused variable message since this is intentional
127+
// In GCC we use constructor to perform initialization before any static initializer is able to run
128+
__attribute__((constructor)) static void LibraryInitializerEntry() {
93129
#pragma GCC diagnostic ignored "-Wunused-variable"
130+
volatile LibraryInitializer* library_init = LibraryInitializer::Get();
131+
}
132+
#else
133+
static LibraryInitializer* __library_init = LibraryInitializer::Get();
94134
#endif
95135

96-
static LibraryInitializer* __library_init = LibraryInitializer::Get();
97136
} // namespace mxnet

src/initialize.h

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
/*!
21+
* Copyright (c) 2019 by Contributors
22+
* \file initialize.h
23+
* \brief Library initialization
24+
*/
25+
26+
#include <cstdlib>
27+
28+
#ifndef MXNET_INITIALIZE_H_
29+
#define MXNET_INITIALIZE_H_
30+
31+
namespace mxnet {
32+
33+
void pthread_atfork_prepare();
34+
void pthread_atfork_parent();
35+
void pthread_atfork_child();
36+
37+
/**
38+
* Perform library initialization and control multiprocessing behaviour.
39+
*/
40+
class LibraryInitializer {
41+
public:
42+
static LibraryInitializer* Get() {
43+
static LibraryInitializer inst;
44+
return &inst;
45+
}
46+
47+
/**
48+
* Library initialization. Called on library loading via constructor attributes or
49+
* C++ static initialization.
50+
*/
51+
LibraryInitializer();
52+
53+
/**
54+
* @return true if the current pid doesn't match the one that initialized the library
55+
*/
56+
bool was_forked() const;
57+
58+
/**
59+
* Original pid of the process which first loaded and initialized the library
60+
*/
61+
size_t original_pid_;
62+
size_t mp_worker_nthreads_;
63+
size_t cpu_worker_nthreads_;
64+
size_t omp_num_threads_;
65+
size_t mp_cv_num_threads_;
66+
67+
// Actual code for the atfork handlers as member functions.
68+
69+
void atfork_prepare();
70+
void atfork_parent();
71+
void atfork_child();
72+
73+
private:
74+
/**
75+
* Pthread atfork handlers are used to reset the concurrency state of modules like CustomOperator
76+
* and Engine when forking. When forking only the thread that forks is kept alive and memory is
77+
* copied to the new process so state is inconsistent. This call install the handlers.
78+
* Has no effect on Windows.
79+
*
80+
* https://pubs.opengroup.org/onlinepubs/009695399/functions/pthread_atfork.html
81+
*/
82+
void install_pthread_atfork_handlers();
83+
84+
/**
85+
* Install signal handlers (UNIX). Has no effect on Windows.
86+
*/
87+
void install_signal_handlers();
88+
};
89+
90+
91+
} // namespace mxnet
92+
#endif // MXNET_INITIALIZE_H_

src/profiler/profiler.h

Lines changed: 3 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -36,22 +36,13 @@
3636
#include "./vtune.h"
3737
#include "./aggregate_stats.h"
3838
#include "./nvtx.h"
39+
#include "../common/utils.h"
3940

40-
#if defined(_WIN32) || defined(_WIN64) || defined(__WINDOWS__)
41-
#include <windows.h>
42-
#else
43-
#include <unistd.h>
44-
#include <cstdint>
45-
#endif
4641

4742
namespace mxnet {
4843
namespace profiler {
4944

50-
#if defined(_WIN32) || defined(_WIN64) || defined(__WINDOWS__)
51-
inline size_t current_process_id() { return ::GetCurrentProcessId(); }
52-
#else
53-
inline size_t current_process_id() { return getpid(); }
54-
#endif
45+
5546

5647
/*!
5748
* \brief Constant-sized character array class with simple string API to avoid allocations
@@ -132,7 +123,7 @@ struct ProfileStat {
132123
bool enable_aggregate_ = true;
133124

134125
/* !\brief Process id */
135-
size_t process_id_ = current_process_id();
126+
size_t process_id_ = common::current_process_id();
136127

137128
/*! \brief id of thread which operation run on.
138129
*

0 commit comments

Comments
 (0)