8
8
import atexit
9
9
import os
10
10
import subprocess
11
+ import sys
11
12
import typing as t
12
13
13
- from celery import Celery as _Celery
14
+ import celery
14
15
15
16
from ..types import LogLevel
16
17
18
+ CELERY_MAIN_PATH = os .path .join (celery .__path__ [0 ], "__main__.py" )
17
19
18
- class CeleryApplication (_Celery ):
20
+
21
+ class CeleryServer (celery .Celery ):
19
22
"""A server for a Celery app."""
20
23
21
24
def __init__ (
@@ -25,8 +28,18 @@ def __init__(
25
28
):
26
29
"""Initialize a Celery app.
27
30
31
+ Examples:
32
+ ```
33
+ from codeforlife.servers import CeleryServer, DjangoServer
34
+
35
+ # Make sure to set up Django before initializing!
36
+ DjangoServer.setup()
37
+
38
+ celery_app = CeleryServer().app
39
+ ```
40
+
28
41
Args:
29
- app: The Celery app name .
42
+ app: The dot-path to the Celery app.
30
43
debug: A flag designating whether to run the app in debug mode.
31
44
32
45
Raises:
@@ -39,8 +52,8 @@ def __init__(
39
52
)
40
53
41
54
super ().__init__ ()
42
-
43
- self .app = app
55
+ self . app = self
56
+ self ._app = app
44
57
45
58
# Using a string here means the worker doesn't have to serialize
46
59
# the configuration object to child processes.
@@ -59,6 +72,10 @@ def _debug(self, *args, **kwargs):
59
72
60
73
print (f"Request: { self .request !r} " )
61
74
75
+ if os .path .abspath (sys .argv [0 ]) != CELERY_MAIN_PATH :
76
+ self .start_background_workers ()
77
+ self .start_background_beat ()
78
+
62
79
def start_background_workers (
63
80
self ,
64
81
workers : t .Optional [t .Union [t .Set [str ], t .Dict [str , int ]]] = None ,
@@ -74,8 +91,6 @@ def start_background_workers(
74
91
"""
75
92
# pylint: enable=line-too-long
76
93
77
- print ("Starting all Celery workers." )
78
-
79
94
commands : t .Dict [str , t .List [str ]] = {}
80
95
81
96
def build_command (worker : str , concurrency : int = 0 ):
@@ -84,7 +99,7 @@ def build_command(worker: str, concurrency: int = 0):
84
99
"multi" ,
85
100
"start" ,
86
101
worker ,
87
- f"--app={ self .app } " ,
102
+ f"--app={ self ._app } " ,
88
103
f"--loglevel={ log_level } " ,
89
104
]
90
105
@@ -106,20 +121,11 @@ def build_command(worker: str, concurrency: int = 0):
106
121
107
122
for worker , command in commands .items ():
108
123
try :
109
- process = subprocess .run (
110
- command ,
111
- capture_output = True ,
112
- text = True ,
113
- check = True ,
114
- )
115
- print (f"Successfully started Celery worker '{ worker } '." )
116
- print (process .stdout )
117
-
124
+ subprocess .run (command , check = True )
118
125
successfully_started_workers .add (worker )
119
126
120
- except subprocess .CalledProcessError as e :
121
- print (f"Error starting Celery worker '{ worker } ': { e } " )
122
- print (e .stderr )
127
+ except Exception as ex : # pylint: disable=broad-exception-caught
128
+ print (f"Error starting Celery worker '{ worker } ': { ex } " )
123
129
124
130
if successfully_started_workers :
125
131
atexit .register (
@@ -141,28 +147,21 @@ def stop_background_workers(
141
147
log_level: The log level.
142
148
"""
143
149
144
- print ("Stopping all Celery workers." )
145
-
146
150
try :
147
- process = subprocess .run (
151
+ subprocess .run (
148
152
[
149
153
"celery" ,
150
154
"multi" ,
151
155
"stopwait" ,
152
156
* workers ,
153
- f"--app={ self .app } " ,
157
+ f"--app={ self ._app } " ,
154
158
f"--loglevel={ log_level } " ,
155
159
],
156
- capture_output = True ,
157
- text = True ,
158
160
check = True ,
159
161
)
160
- print ("Successfully stopped all Celery workers." )
161
- print (process .stdout )
162
162
163
- except subprocess .CalledProcessError as error :
164
- print (f"Error stopping all Celery workers: { error } " )
165
- print (error .stderr )
163
+ except Exception as ex : # pylint: disable=broad-exception-caught
164
+ print (f"Error stopping all Celery workers: { ex } " )
166
165
167
166
def start_background_beat (self , log_level : LogLevel = "INFO" ):
168
167
"""Start Celery beat using the 'celery --app=app beat' command.
@@ -174,64 +173,23 @@ def start_background_beat(self, log_level: LogLevel = "INFO"):
174
173
The background process running Celery beat.
175
174
"""
176
175
177
- print ("Starting Celery beat." )
178
-
179
176
try :
180
177
process = subprocess .Popen ( # pylint: disable=consider-using-with
181
178
[
182
179
"celery" ,
183
- f"--app={ self .app } " ,
180
+ f"--app={ self ._app } " ,
184
181
"beat" ,
185
182
f"--loglevel={ log_level } " ,
186
183
],
187
184
stdout = subprocess .DEVNULL ,
188
185
stderr = subprocess .DEVNULL ,
189
186
)
190
- print ("Successfully started Celery beat." )
191
187
192
- atexit .register (self . stop_background_beat , process )
188
+ atexit .register (process . terminate )
193
189
194
190
return process
195
191
196
- except subprocess .CalledProcessError as error :
197
- print (f"Error starting Celery beat: { error } " )
198
-
199
- return None
200
-
201
- def stop_background_beat (self , process : subprocess .Popen ):
202
- """Stop a Celery beat process.
203
-
204
- Args:
205
- process: The process to stop.
206
- """
207
-
208
- print ("Stopping Celery beat." )
209
-
210
- try :
211
- process .terminate ()
212
- print ("Successfully stopped Celery beat." )
213
-
214
192
except Exception as ex : # pylint: disable=broad-exception-caught
215
- print (f"Error stopping Celery beat: { ex } " )
216
-
217
- def handle_startup (self ):
218
- """Handle the startup procedure of a Celery app.
219
-
220
- Examples:
221
- ```
222
- from codeforlife.apps import CeleryApplication, DjangoApplication
223
-
224
- # Make sure to set up Django before starting!
225
- DjangoApplication.setup()
193
+ print (f"Error starting Celery beat: { ex } " )
226
194
227
- celery_app = CeleryApplication().handle_startup()
228
- ```
229
-
230
- Returns:
231
- The Celery app instance for convenience.
232
- """
233
-
234
- self .start_background_workers ()
235
- self .start_background_beat ()
236
-
237
- return self
195
+ return None
0 commit comments