Skip to content

Commit c6eb826

Browse files
authored
[IBM Code Engine] Added local disk variable for scheduler and worker (#440)
* [IBM Code Engine] Added local disk storage to scheduler and workers * [IBM Code Engine] Local disk clarification added
1 parent 6eaf2db commit c6eb826

File tree

2 files changed

+27
-0
lines changed

2 files changed

+27
-0
lines changed

dask_cloudprovider/cloudprovider.yaml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,9 +128,11 @@ cloudprovider:
128128
project_id: null
129129
scheduler_cpu: "1.0"
130130
scheduler_mem: 4G
131+
scheduler_disk: 400M
131132
scheduler_timeout: 600 # seconds
132133
worker_cpu: "2.0"
133134
worker_mem: 8G
135+
worker_disk: 400M
134136
worker_threads: 1
135137

136138
openstack:

dask_cloudprovider/ibm/code_engine.py

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,9 +41,11 @@ def __init__(
4141
project_id: str = None,
4242
scheduler_cpu: str = None,
4343
scheduler_mem: str = None,
44+
scheduler_disk: str = None,
4445
scheduler_timeout: int = None,
4546
worker_cpu: str = None,
4647
worker_mem: str = None,
48+
worker_disk: str = None,
4749
worker_threads: int = None,
4850
api_key: str = None,
4951
**kwargs,
@@ -56,9 +58,11 @@ def __init__(
5658
self.project_id = project_id
5759
self.scheduler_cpu = scheduler_cpu
5860
self.scheduler_mem = scheduler_mem
61+
self.scheduler_disk = scheduler_disk
5962
self.scheduler_timeout = scheduler_timeout
6063
self.worker_cpu = worker_cpu
6164
self.worker_mem = worker_mem
65+
self.worker_disk = worker_disk
6266
self.worker_threads = worker_threads
6367
self.api_key = api_key
6468

@@ -89,6 +93,7 @@ async def create_vm(self):
8993
scale_min_instances=1,
9094
scale_concurrency=1000,
9195
scale_memory_limit=self.memory,
96+
scale_ephemeral_storage_limit=self.disk,
9297
scale_request_timeout=self.cluster.scheduler_timeout,
9398
run_env_variables=[
9499
{
@@ -144,6 +149,7 @@ def create_job_run_thread():
144149
run_commands=self.command,
145150
scale_cpu_limit=self.cpu,
146151
scale_memory_limit=self.memory,
152+
scale_ephemeral_storage_limit=self.disk,
147153
run_env_variables=[
148154
{
149155
"type": "config_map_key_reference",
@@ -194,6 +200,7 @@ def __init__(self, *args, **kwargs):
194200
super().__init__(*args, **kwargs)
195201
self.cpu = self.cluster.scheduler_cpu
196202
self.memory = self.cluster.scheduler_mem
203+
self.disk = self.cluster.scheduler_disk
197204

198205
self.command = [
199206
"python",
@@ -211,9 +218,11 @@ async def start(self):
211218
f"\n Project id: {self.project_id} "
212219
f"\n Scheduler CPU: {self.cpu} "
213220
f"\n Scheduler Memory: {self.memory} "
221+
f"\n Scheduler Disk: {self.disk} "
214222
f"\n Scheduler Timeout: {self.cluster.scheduler_timeout} "
215223
f"\n Worker CPU: {self.cluster.worker_cpu} "
216224
f"\n Worker Memory: {self.cluster.worker_mem} "
225+
f"\n Worker Disk: {self.cluster.worker_disk} "
217226
f"\n Worker Threads: {self.cluster.worker_threads} "
218227
)
219228
self.cluster._log(f"Creating scheduler instance {self.name}")
@@ -244,6 +253,7 @@ def __init__(
244253
self.worker_options = worker_options
245254
self.cpu = self.cluster.worker_cpu
246255
self.memory = self.cluster.worker_mem
256+
self.disk = self.cluster.worker_disk
247257

248258
# On this case, the worker must connect to the scheduler internal URL with the "ws" protocol and port 80
249259
internal_scheduler = f"ws://{self.cluster.scheduler_internal_ip}:80"
@@ -300,6 +310,8 @@ class IBMCodeEngineCluster(VMCluster):
300310
The amount of memory to allocate to the scheduler.
301311
302312
See: https://cloud.ibm.com/docs/codeengine?topic=codeengine-mem-cpu-combo
313+
scheduler_disk: str
314+
The amount of ephemeral storage to allocate to the scheduler. This value must be lower than scheduler_mem.
303315
scheduler_timeout: int
304316
The timeout for the scheduler in seconds.
305317
worker_cpu: str
@@ -310,6 +322,8 @@ class IBMCodeEngineCluster(VMCluster):
310322
The amount of memory to allocate to each worker.
311323
312324
See: https://cloud.ibm.com/docs/codeengine?topic=codeengine-mem-cpu-combo
325+
worker_disk: str
326+
The amount of ephemeral storage to allocate to each worker. This value must be lower than worker_mem.
313327
worker_threads: int
314328
The number of threads to use on each worker.
315329
debug: bool, optional
@@ -347,9 +361,11 @@ class IBMCodeEngineCluster(VMCluster):
347361
Project id: f21626f6-54f7-4065-a038-75c8b9a0d2e0
348362
Scheduler CPU: 0.25
349363
Scheduler Memory: 1G
364+
Scheduler Disk: 400M
350365
Scheduler Timeout: 600
351366
Worker CPU: 2
352367
Worker Memory: 4G
368+
Worker Disk: 400M
353369
Creating scheduler dask-xxxxxxxx-scheduler
354370
Waiting for scheduler to run at dask-xxxxxxxx-scheduler.xxxxxxxxxxxx.xx-xx.codeengine.appdomain.cloud:443
355371
Scheduler is running
@@ -382,9 +398,12 @@ class IBMCodeEngineCluster(VMCluster):
382398
Project id: f21626f6-54f7-4065-a038-75c8b9a0d2e0
383399
Scheduler CPU: 0.25
384400
Scheduler Memory: 1G
401+
Scheduler Disk: 400M
385402
Scheduler Timeout: 600
386403
Worker CPU: 2
387404
Worker Memory: 4G
405+
Worker Disk: 400M
406+
Worker Threads: 1
388407
Creating scheduler dask-xxxxxxxx-scheduler
389408
Waiting for scheduler to run at dask-xxxxxxxx-scheduler.xxxxxxxxxxxx.xx-xx.codeengine.appdomain.cloud:443
390409
Scheduler is running
@@ -402,9 +421,11 @@ def __init__(
402421
project_id: str = None,
403422
scheduler_cpu: str = None,
404423
scheduler_mem: str = None,
424+
scheduler_disk: str = None,
405425
scheduler_timeout: int = None,
406426
worker_cpu: str = None,
407427
worker_mem: str = None,
428+
worker_disk: str = None,
408429
worker_threads: int = 1,
409430
debug: bool = False,
410431
**kwargs,
@@ -419,11 +440,13 @@ def __init__(
419440
api_key = self.config.get("api_key")
420441
self.scheduler_cpu = scheduler_cpu or self.config.get("scheduler_cpu")
421442
self.scheduler_mem = scheduler_mem or self.config.get("scheduler_mem")
443+
self.scheduler_disk = scheduler_disk or self.config.get("scheduler_disk")
422444
self.scheduler_timeout = scheduler_timeout or self.config.get(
423445
"scheduler_timeout"
424446
)
425447
self.worker_cpu = worker_cpu or self.config.get("worker_cpu")
426448
self.worker_mem = worker_mem or self.config.get("worker_mem")
449+
self.worker_disk = worker_disk or self.config.get("worker_disk")
427450
self.worker_threads = worker_threads or self.config.get("worker_threads")
428451

429452
self.debug = debug
@@ -436,9 +459,11 @@ def __init__(
436459
"project_id": self.project_id,
437460
"scheduler_cpu": self.scheduler_cpu,
438461
"scheduler_mem": self.scheduler_mem,
462+
"scheduler_disk": self.scheduler_disk,
439463
"scheduler_timeout": self.scheduler_timeout,
440464
"worker_cpu": self.worker_cpu,
441465
"worker_mem": self.worker_mem,
466+
"worker_disk": self.worker_disk,
442467
"worker_threads": self.worker_threads,
443468
"api_key": api_key,
444469
}

0 commit comments

Comments
 (0)