Skip to content

distributed.scheduler - ERROR - Workers don't have promised key #127

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
jjerphan opened this issue Mar 6, 2019 · 12 comments
Closed

distributed.scheduler - ERROR - Workers don't have promised key #127

jjerphan opened this issue Mar 6, 2019 · 12 comments

Comments

@jjerphan
Copy link

jjerphan commented Mar 6, 2019

Problem description:

I am trying to run computation on Kubernetes using a KubeCluster from a yaml Pod description identically to the simple example given in the documentation.

The snippets manage to create the pod on the server and the pods runs correctly — with some warning for my simple setup (see bellow):

In [1]: import dask.array as da
   ...: get_ipython().system(' cat worker-spec.yml')
   ...: from dask_kubernetes import KubeCluster
   ...: cluster = KubeCluster.from_yaml('worker-spec.yml')
   ...: cluster.scale(1)
   ...: from dask.distributed import Client
   ...: client = Client(cluster)
kind: Pod
metadata:
  labels:
    foo: bar
spec:
  restartPolicy: Never
  containers:
  - image: daskdev/dask:latest
    imagePullPolicy: IfNotPresent
    args: [dask-worker, --nthreads, '2', --no-bokeh, --memory-limit, 6GB, --death-timeout, '60']
    name: dask
    env:
      - name: EXTRA_PIP_PACKAGES
        value: fastparquet git+https://github.com/dask/distributed
    resources:
      limits:
        cpu: "2"
        memory: 6G
      requests:
        cpu: "2"
        memory: 6G
/Users/jjerphanion/.virtualenvs/dask_explo/lib/python3.7/site-packages/urllib3/connectionpool.py:847: InsecureRequestWarning: Unverified HTTPS request is being made. Adding certificate verification is strongly advised. See: https://urllib3.readthedocs.io/en/latest/advanced-usage.html#ssl-warnings
  InsecureRequestWarning)
/Users/jjerphanion/.virtualenvs/dask_explo/lib/python3.7/site-packages/urllib3/connectionpool.py:847: InsecureRequestWarning: Unverified HTTPS request is being made. Adding certificate verification is strongly advised. See: https://urllib3.readthedocs.io/en/latest/advanced-usage.html#ssl-warnings
  InsecureRequestWarning)
/Users/jjerphanion/.virtualenvs/dask_explo/lib/python3.7/site-packages/urllib3/connectionpool.py:847: InsecureRequestWarning: Unverified HTTPS request is being made. Adding certificate verification is strongly advised. See: https://urllib3.readthedocs.io/en/latest/advanced-usage.html#ssl-warnings
  InsecureRequestWarning)

The pod is Running on a node; from the pod logs:

 + dask-worker --nthreads 2 --no-bokeh --memory-limit 6GB --death-timeout 60
distributed.nanny - INFO -         Start Nanny at: 'tcp://10.1.0.76:36787'
distributed.worker - INFO -       Start worker at:      tcp://10.1.0.76:41501
distributed.worker - INFO -          Listening to:      tcp://10.1.0.76:41501
distributed.worker - INFO -              nanny at:            10.1.0.76:36787
distributed.worker - INFO - Waiting to connect to: tcp://192.168.10.123:53078
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO -               Threads:                          2
distributed.worker - INFO -                Memory:                    6.00 GB
distributed.worker - INFO -       Local Directory:           /worker-vqbrmmi0
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO -         Registered to: tcp://192.168.10.123:53078
distributed.worker - INFO - -------------------------------------------------
distributed.core - INFO - Starting established connection

However, when running compute on the array:

In [2]: array = da.ones((1000,1000), chunks=(100,100))

In [3]: array.mean().compute()

it hangs and then throws:

distributed.scheduler - ERROR - Workers don't have promised key: ['tcp://10.1.0.76:41501'], ('mean_agg-aggregate-b6d9a3f9f37890d8fbfe271bb5244b6f',)
NoneType: None
distributed.client - WARNING - Couldn't gather 1 keys, rescheduling {"('mean_agg-aggregate-b6d9a3f9f37890d8fbfe271bb5244b6f',)": ('tcp://10.1.0.76:41501',)}

The connection is then closed and the pod is marked as Completed ; from the pod logs:

...
distributed.worker - INFO - -------------------------------------------------
distributed.core - INFO - Starting established connection
distributed.nanny - INFO - Closing Nanny at 'tcp://10.1.0.76:36787'
distributed.dask_worker - INFO - End worker 

I am inspecting this right now.
Let me know if I need to be more explicit or precise.


Setup:

  • OS: MacOSx Mojave (18.2.0 Darwin Kernel Version 18.2.0: Thu Dec 20 20:46:53 PST 2018; root:xnu-4903.241.1~1/RELEASE_X86_64 x86_64)
  • Local Kubernetes cluster on Docker Desktop with 3 CPU and 6GioB allocated.

Details:

Typical pod description
$ kubectl describe pod # just one present and completed
Name:         dask-jjerphanion-5b89d6dc-fbb42p
Namespace:    default
Node:         docker-for-desktop/192.168.65.3
Start Time:   Wed, 06 Mar 2019 09:27:01 +0100
Labels:       app=dask
              component=dask-worker
              dask.org/cluster-name=dask-jjerphanion-5b89d6dc-f
              foo=bar
              user=jjerphanion
Annotations:  
Status:       Succeeded
IP:           10.1.0.76
Containers:
  dask:
    Container ID:  docker://b800fdb628987a254d60c6223e4a53256afc8e2a7b35db109b8a62fa57a47f24
    Image:         daskdev/dask:latest
    Image ID:      docker-pullable://daskdev/dask@sha256:581446064658cc5a2f6b19b77bce6d0ad239618a15cdcefc6d3b1ee73ae5848a
    Port:          
    Host Port:     
    Args:
      dask-worker
      --nthreads
      2
      --no-bokeh
      --memory-limit
      6GB
      --death-timeout
      60
    State:          Terminated
      Reason:       Completed
      Exit Code:    0
      Started:      Wed, 06 Mar 2019 09:27:03 +0100
      Finished:     Wed, 06 Mar 2019 09:28:20 +0100
    Ready:          False
    Restart Count:  0
    Limits:
      cpu:     2
      memory:  6G
    Requests:
      cpu:     2
      memory:  6G
    Environment:
      EXTRA_PIP_PACKAGES:      fastparquet git+https://github.com/dask/distributed
      DASK_SCHEDULER_ADDRESS:  tcp://192.168.10.123:53078
    Mounts:
      /var/run/secrets/kubernetes.io/serviceaccount from default-token-7fxsz (ro)
Conditions:
  Type           Status
  Initialized    True
  Ready          False
  PodScheduled   True
Volumes:
  default-token-7fxsz:
    Type:        Secret (a volume populated by a Secret)
    SecretName:  default-token-7fxsz
    Optional:    false
QoS Class:       Guaranteed
Node-Selectors:  
Tolerations:     node.kubernetes.io/not-ready:NoExecute for 300s
                 node.kubernetes.io/unreachable:NoExecute for 300s
Events:
  Type    Reason                 Age   From                         Message
  ----    ------                 ----  ----                         -------
  Normal  Scheduled              12m   default-scheduler            Successfully assigned dask-jjerphanion-5b89d6dc-fbb42p to docker-for-desktop
  Normal  SuccessfulMountVolume  12m   kubelet, docker-for-desktop  MountVolume.SetUp succeeded for volume "default-token-7fxsz"
  Normal  Pulled                 12m   kubelet, docker-for-desktop  Container image "daskdev/dask:latest" already present on machine
  Normal  Created                12m   kubelet, docker-for-desktop  Created container
  Normal  Started                12m   kubelet, docker-for-desktop  Started container
Typical node description
$ kubectl describe node # again just one running locally
ame:               docker-for-desktop
Roles:              master
Labels:             beta.kubernetes.io/arch=amd64
                    beta.kubernetes.io/os=linux
                    kubernetes.io/hostname=docker-for-desktop
                    node-role.kubernetes.io/master=
Annotations:        node.alpha.kubernetes.io/ttl: 0
                    volumes.kubernetes.io/controller-managed-attach-detach: true
CreationTimestamp:  Mon, 04 Mar 2019 09:52:32 +0100
Taints:             
Unschedulable:      false
Conditions:
  Type             Status  LastHeartbeatTime                 LastTransitionTime                Reason                       Message
  ----             ------  -----------------                 ------------------                ------                       -------
  OutOfDisk        False   Wed, 06 Mar 2019 09:46:36 +0100   Mon, 04 Mar 2019 09:52:26 +0100   KubeletHasSufficientDisk     kubelet has sufficient disk space available
  MemoryPressure   False   Wed, 06 Mar 2019 09:46:36 +0100   Mon, 04 Mar 2019 09:52:26 +0100   KubeletHasSufficientMemory   kubelet has sufficient memory available
  DiskPressure     False   Wed, 06 Mar 2019 09:46:36 +0100   Mon, 04 Mar 2019 09:52:26 +0100   KubeletHasNoDiskPressure     kubelet has no disk pressure
  PIDPressure      False   Wed, 06 Mar 2019 09:46:36 +0100   Mon, 04 Mar 2019 09:52:26 +0100   KubeletHasSufficientPID      kubelet has sufficient PID available
  Ready            True    Wed, 06 Mar 2019 09:46:36 +0100   Tue, 05 Mar 2019 19:48:29 +0100   KubeletReady                 kubelet is posting ready status
Addresses:
  InternalIP:  192.168.65.3
  Hostname:    docker-for-desktop
Capacity:
 cpu:                3
 ephemeral-storage:  61255492Ki
 hugepages-1Gi:      0
 hugepages-2Mi:      0
 memory:             6100484Ki
 pods:               110
Allocatable:
 cpu:                3
 ephemeral-storage:  56453061334
 hugepages-1Gi:      0
 hugepages-2Mi:      0
 memory:             5998084Ki
 pods:               110
System Info:
 Machine ID:
 System UUID:                268147C3-0000-0000-B7EE-26C7CF1F759A
 Boot ID:                    6b8b2553-5adc-462f-89a5-0023d7f04845
 Kernel Version:             4.9.125-linuxkit
 OS Image:                   Docker for Mac
 Operating System:           linux
 Architecture:               amd64
 Container Runtime Version:  docker://18.9.2
 Kubelet Version:            v1.10.11
 Kube-Proxy Version:         v1.10.11
Non-terminated Pods:         (10 in total)
  Namespace                  Name                                          CPU Requests  CPU Limits  Memory Requests  Memory Limits  AGE
  ---------                  ----                                          ------------  ----------  ---------------  -------------  ---
  docker                     compose-74649b4db6-t2697                      0 (0%)        0 (0%)      0 (0%)           0 (0%)         39h
  docker                     compose-api-644c56cbdc-jslvm                  0 (0%)        0 (0%)      0 (0%)           0 (0%)         39h
  kube-system                etcd-docker-for-desktop                       0 (0%)        0 (0%)      0 (0%)           0 (0%)         47h
  kube-system                kube-apiserver-docker-for-desktop             250m (8%)     0 (0%)      0 (0%)           0 (0%)         47h
  kube-system                kube-controller-manager-docker-for-desktop    200m (6%)     0 (0%)      0 (0%)           0 (0%)         47h
  kube-system                kube-dns-86f4d74b45-lxsjz                     260m (8%)     0 (0%)      110Mi (1%)       170Mi (2%)     47h
  kube-system                kube-proxy-4qsg5                              0 (0%)        0 (0%)      0 (0%)           0 (0%)         47h
  kube-system                kube-scheduler-docker-for-desktop             100m (3%)     0 (0%)      0 (0%)           0 (0%)         47h
  kube-system                kubernetes-dashboard-669f9bbd46-vpcxl         0 (0%)        0 (0%)      0 (0%)           0 (0%)         16h
  kube-system                tiller-deploy-9cb565677-56mhw                 0 (0%)        0 (0%)      0 (0%)           0 (0%)         47h
Allocated resources:
  (Total limits may be over 100 percent, i.e., overcommitted.)
  Resource           Requests    Limits
  --------           --------    ------
  cpu                810m (27%)  0 (0%)
  memory             110Mi (1%)  170Mi (2%)
  ephemeral-storage  0 (0%)      0 (0%)
Events:
  Type    Reason        Age   From                         Message
  ----    ------        ----  ----                         -------
  Normal  NodeNotReady  13h   kubelet, docker-for-desktop  Node docker-for-desktop status is now: NodeNotReady
  Normal  NodeReady     13h   kubelet, docker-for-desktop  Node docker-for-desktop status is now: NodeReady
Full Typical iPython Trace Python 3.7.1 (default, Dec 14 2018, 13:28:58) Type 'copyright', 'credits' or 'license' for more information IPython 7.3.0 -- An enhanced Interactive Python. Type '?' for help.

In [1]: import dask.array as da

In [2]: ! cat worker-spec.yml
kind: Pod
metadata:
labels:
foo: bar
spec:
restartPolicy: Never
containers:

  • image: daskdev/dask:latest
    imagePullPolicy: IfNotPresent
    args: [dask-worker, --nthreads, '2', --no-bokeh, --memory-limit, 6GB, --death-timeout, '60']
    name: dask
    env:

In [3]: from dask_kubernetes import KubeCluster
...: cluster = KubeCluster.from_yaml('worker-spec.yml')

In [4]: cluster.scale(1)
/Users/jjerphanion/.virtualenvs/dask_explo/lib/python3.7/site-packages/urllib3/connectionpool.py:847: InsecureRequestWarning: Unverified HTTPS request is being made. Adding certificate verification is strongly advised. See: https://urllib3.readthedocs.io/en/latest/advanced-usage.html#ssl-warnings
InsecureRequestWarning)
/Users/jjerphanion/.virtualenvs/dask_explo/lib/python3.7/site-packages/urllib3/connectionpool.py:847: InsecureRequestWarning: Unverified HTTPS request is being made. Adding certificate verification is strongly advised. See: https://urllib3.readthedocs.io/en/latest/advanced-usage.html#ssl-warnings
InsecureRequestWarning)
/Users/jjerphanion/.virtualenvs/dask_explo/lib/python3.7/site-packages/urllib3/connectionpool.py:847: InsecureRequestWarning: Unverified HTTPS request is being made. Adding certificate verification is strongly advised. See: https://urllib3.readthedocs.io/en/latest/advanced-usage.html#ssl-warnings
InsecureRequestWarning)
Out[4]:
[{'api_version': 'v1',
'kind': 'Pod',
'metadata': {'annotations': None,
'cluster_name': None,
'creation_timestamp': datetime.datetime(2019, 3, 5, 16, 2, 1, tzinfo=tzutc()),
'deletion_grace_period_seconds': None,
'deletion_timestamp': None,
'finalizers': None,
'generate_name': 'dask-jjerphanion-182c0701-7',
'generation': None,
'initializers': None,
'labels': {'app': 'dask',
'component': 'dask-worker',
'dask.org/cluster-name': 'dask-jjerphanion-182c0701-7',
'foo': 'bar',
'user': 'jjerphanion'},
'name': 'dask-jjerphanion-182c0701-72jk8q',
'namespace': 'default',
'owner_references': None,
'resource_version': '49676',
'self_link': '/api/v1/namespaces/default/pods/dask-jjerphanion-182c0701-72jk8q',
'uid': '02fd2b65-3f60-11e9-91a3-025000000001'},
'spec': {'active_deadline_seconds': None,
'affinity': None,
'automount_service_account_token': None,
'containers': [{'args': ['dask-worker',
'--nthreads',
'2',
'--no-bokeh',
'--memory-limit',
'6GB',
'--death-timeout',
'60'],
'command': None,
'env': [{'name': 'EXTRA_PIP_PACKAGES',
'value': 'fastparquet '
'git+https://github.com/dask/distributed',
'value_from': None},
{'name': 'DASK_SCHEDULER_ADDRESS',
'value': 'tcp://192.168.10.123:65514',
'value_from': None}],
'env_from': None,
'image': 'daskdev/dask:latest',
'image_pull_policy': 'IfNotPresent',
'lifecycle': None,
'liveness_probe': None,
'name': 'dask',
'ports': None,
'readiness_probe': None,
'resources': {'limits': {'cpu': '2', 'memory': '6G'},
'requests': {'cpu': '2',
'memory': '6G'}},
'security_context': None,
'stdin': None,
'stdin_once': None,
'termination_message_path': '/dev/termination-log',
'termination_message_policy': 'File',
'tty': None,
'volume_devices': None,
'volume_mounts': [{'mount_path': '/var/run/secrets/kubernetes.io/serviceaccount',
'mount_propagation': None,
'name': 'default-token-7fxsz',
'read_only': True,
'sub_path': None}],
'working_dir': None}],
'dns_config': None,
'dns_policy': 'ClusterFirst',
'host_aliases': None,
'host_ipc': None,
'host_network': None,
'host_pid': None,
'hostname': None,
'image_pull_secrets': None,
'init_containers': None,
'node_name': None,
'node_selector': None,
'priority': None,
'priority_class_name': None,
'readiness_gates': None,
'restart_policy': 'Never',
'runtime_class_name': None,
'scheduler_name': 'default-scheduler',
'security_context': {'fs_group': None,
'run_as_group': None,
'run_as_non_root': None,
'run_as_user': None,
'se_linux_options': None,
'supplemental_groups': None,
'sysctls': None},
'service_account': 'default',
'service_account_name': 'default',
'share_process_namespace': None,
'subdomain': None,
'termination_grace_period_seconds': 30,
'tolerations': [{'effect': 'NoExecute',
'key': 'node.kubernetes.io/not-ready',
'operator': 'Exists',
'toleration_seconds': 300,
'value': None},
{'effect': 'NoExecute',
'key': 'node.kubernetes.io/unreachable',
'operator': 'Exists',
'toleration_seconds': 300,
'value': None}],
'volumes': [{'aws_elastic_block_store': None,
'azure_disk': None,
'azure_file': None,
'cephfs': None,
'cinder': None,
'config_map': None,
'downward_api': None,
'empty_dir': None,
'fc': None,
'flex_volume': None,
'flocker': None,
'gce_persistent_disk': None,
'git_repo': None,
'glusterfs': None,
'host_path': None,
'iscsi': None,
'name': 'default-token-7fxsz',
'nfs': None,
'persistent_volume_claim': None,
'photon_persistent_disk': None,
'portworx_volume': None,
'projected': None,
'quobyte': None,
'rbd': None,
'scale_io': None,
'secret': {'default_mode': 420,
'items': None,
'optional': None,
'secret_name': 'default-token-7fxsz'},
'storageos': None,
'vsphere_volume': None}]},
'status': {'conditions': None,
'container_statuses': None,
'host_ip': None,
'init_container_statuses': None,
'message': None,
'nominated_node_name': None,
'phase': 'Pending',
'pod_ip': None,
'qos_class': 'Guaranteed',
'reason': None,
'start_time': None}}]

In [5]: from dask.distributed import Client
...: client = Client(cluster)

In [6]: array = da.ones((1000, 1000, 1000), chunks=(100, 100, 10))

In [7]: print(a>ray.mean().compute()) # Should print 1.0
distributed.scheduler - ERROR - Workers don't have promised key: ['tcp://10.1.0.70:33625'], ('mean_agg-aggregate-e4e07ab83fc0bd725ffa4d6ba65f1e21',)
NoneType: None
distributed.client - WARNING - Couldn't gather 1 keys, rescheduling {"('mean_agg-aggregate-e4e07ab83fc0bd725ffa4d6ba65f1e21',)": ('tcp://10.1.0.70:33625',)}

Typical Pod logs error: command 'gcc' failed with exit status 1

Failed building wheel for fastparquet
Running setup.py clean for fastparquet
Running setup.py bdist_wheel for distributed: started
Running setup.py bdist_wheel for distributed: finished with status 'done'
Stored in directory: /tmp/pip-ephem-wheel-cache-2z3zvorq/wheels/aa/21/a7/d9548d684f8e074360b7ad1bd8633843dba9658288b68b3dd5
Running setup.py bdist_wheel for thrift: started
Running setup.py bdist_wheel for thrift: finished with status 'done'
Stored in directory: /root/.cache/pip/wheels/be/36/81/0f93ba89a1cb7887c91937948519840a72c0ffdd57cac0ae8f
Successfully built distributed thrift
Failed to build fastparquet
Installing collected packages: llvmlite, numba, thrift, pytest-runner, fastparquet, distributed
Running setup.py install for fastparquet: started
Running setup.py install for fastparquet: finished with status 'error'
Complete output from command /opt/conda/bin/python -u -c "import setuptools, tokenize;file='/tmp/pip-install-tt7ywwvd/fastparquet/setup.py';f=getattr(tokenize, 'open', open)(file);code=f.read().replace('\r\n', '\n');f.close();exec(compile(code, file, 'exec'))" install --record /tmp/pip-record-qhsg1gjo/install-record.txt --single-version-externally-managed --compile:
running install
running build
running build_py
running egg_info
writing fastparquet.egg-info/PKG-INFO
writing dependency_links to fastparquet.egg-info/dependency_links.txt
writing requirements to fastparquet.egg-info/requires.txt
writing top-level names to fastparquet.egg-info/top_level.txt
reading manifest file 'fastparquet.egg-info/SOURCES.txt'
reading manifest template 'MANIFEST.in'
no previously-included directories found matching 'docs/_build'
writing manifest file 'fastparquet.egg-info/SOURCES.txt'
running build_ext
building 'fastparquet.speedups' extension
creating build/temp.linux-x86_64-3.7
creating build/temp.linux-x86_64-3.7/fastparquet
gcc -pthread -B /opt/conda/compiler_compat -Wl,--sysroot=/ -Wsign-compare -DNDEBUG -g -fwrapv -O3 -Wall -Wstrict-prototypes -fPIC -I/opt/conda/include/python3.7m -I/opt/conda/lib/python3.7/site-packages/numpy/core/include -c fastparquet/speedups.c -o build/temp.linux-x86_64-3.7/fastparquet/speedups.o
unable to execute 'gcc': No such file or directory
error: command 'gcc' failed with exit status 1

----------------------------------------

Command "/opt/conda/bin/python -u -c "import setuptools, tokenize;file='/tmp/pip-install-tt7ywwvd/fastparquet/setup.py';f=getattr(tokenize, 'open', open)(file);code=f.read().replace('\r\n', '\n');f.close();exec(compile(code, file, 'exec'))" install --record /tmp/pip-record-qhsg1gjo/install-record.txt --single-version-externally-managed --compile" failed with error code 1 in /tmp/pip-install-tt7ywwvd/fastparquet/
You are using pip version 10.0.1, however version 19.0.3 is available.
You should consider upgrading via the 'pip install --upgrade pip' command.

  • dask-worker --nthreads 2 --no-bokeh --memory-limit 6GB --death-timeout 60
    distributed.nanny - INFO - Start Nanny at: 'tcp://10.1.0.76:36787'
    distributed.worker - INFO - Start worker at: tcp://10.1.0.76:41501
    distributed.worker - INFO - Listening to: tcp://10.1.0.76:41501
    distributed.worker - INFO - nanny at: 10.1.0.76:36787
    distributed.worker - INFO - Waiting to connect to: tcp://192.168.10.123:53078
    distributed.worker - INFO - -------------------------------------------------
    distributed.worker - INFO - Threads: 2
    distributed.worker - INFO - Memory: 6.00 GB
    distributed.worker - INFO - Local Directory: /worker-vqbrmmi0
    distributed.worker - INFO - -------------------------------------------------
    distributed.worker - INFO - Registered to: tcp://192.168.10.123:53078
    distributed.worker - INFO - -------------------------------------------------
    distributed.core - INFO - Starting established connection
    distributed.worker - INFO - Stopping worker at tcp://10.1.0.76:41501
    distributed.nanny - INFO - Closing Nanny at 'tcp://10.1.0.76:36787'
    distributed.dask_worker - INFO - End worker
@jjerphan
Copy link
Author

jjerphan commented Mar 6, 2019

The task are being run according to the dashboard.
screenshot_2019-03-06 dask status

BTW, there are problems installing fastparquet(see pod logs above) ; I don't know how this could be related to this problem.

@mrocklin
Copy link
Member

mrocklin commented Mar 6, 2019

My first guess is a version mismatch between your client and workers. I recommend

client.get_versions(check=True)

@jjerphan
Copy link
Author

jjerphan commented Mar 6, 2019

I've tried this on a ipython interpreter ; it can't even connect to the worker now.

I'll try to use MiniKube instead of Docker Desktop and see if it resolves this problem.

In [1]: from dask.distributed import Client
In [2]: from dask_kubernetes import KubeCluster
In [3]: ! cat notebooks/worker-spec.yml
kind: Pod
metadata:
  labels:
    foo: bar
spec:
  restartPolicy: Never
  containers:
  - image: daskdev/dask:latest
    imagePullPolicy: IfNotPresent
    args: [dask-worker, --nthreads, '2', --no-bokeh, --memory-limit, 6GB, --death-timeout, '60']
    name: dask
    #env:
        #  - name: EXTRA_PIP_PACKAGES
        #    value: fastparquet git+https://github.com/dask/distributed
    resources:
      limits:
        cpu: "2"
        memory: 6G
      requests:
        cpu: "2"
        memory: 6G

In [4]: cluster = KubeCluster.from_yaml('notebooks/worker-spec.yml')

In [5]: cluster.scale(1)
/Users/jjerphanion/.virtualenvs/dask_explo/lib/python3.7/site-packages/urllib3/connectionpool.py:847: InsecureRequestWarning: Unverified HTTPS request is being made. Adding certificate verification is strongly advised. See: https://urllib3.readthedocs.io/en/latest/advanced-usage.html#ssl-warnings
  InsecureRequestWarning)
/Users/jjerphanion/.virtualenvs/dask_explo/lib/python3.7/site-packages/urllib3/connectionpool.py:847: InsecureRequestWarning: Unverified HTTPS request is being made. Adding certificate verification is strongly advised. See: https://urllib3.readthedocs.io/en/latest/advanced-usage.html#ssl-warnings
  InsecureRequestWarning)
/Users/jjerphanion/.virtualenvs/dask_explo/lib/python3.7/site-packages/urllib3/connectionpool.py:847: InsecureRequestWarning: Unverified HTTPS request is being made. Adding certificate verification is strongly advised. See: https://urllib3.readthedocs.io/en/latest/advanced-usage.html#ssl-warnings
  InsecureRequestWarning)
Out[5]:
[{'api_version': 'v1',
  'kind': 'Pod',
  'metadata': {'annotations': None,
               'cluster_name': None,
               'creation_timestamp': datetime.datetime(2019, 3, 6, 16, 15, 1, tzinfo=tzutc()),
               'deletion_grace_period_seconds': None,
               'deletion_timestamp': None,
               'finalizers': None,
               'generate_name': 'dask-jjerphanion-995cfd3b-5',
               'generation': None,
               'initializers': None,
               'labels': {'app': 'dask',
                          'component': 'dask-worker',
                          'dask.org/cluster-name': 'dask-jjerphanion-995cfd3b-5',
                          'foo': 'bar',
                          'user': 'jjerphanion'},
               'name': 'dask-jjerphanion-995cfd3b-5vzkkq',
               'namespace': 'default',
               'owner_references': None,
               'resource_version': '85638',
               'self_link': '/api/v1/namespaces/default/pods/dask-jjerphanion-995cfd3b-5vzkkq',
               'uid': 'fe5e744c-402a-11e9-91a3-025000000001'},
  'spec': {'active_deadline_seconds': None,
           'affinity': None,
           'automount_service_account_token': None,
           'containers': [{'args': ['dask-worker',
                                    '--nthreads',
                                    '2',
                                    '--no-bokeh',
                                    '--memory-limit',
                                    '6GB',
                                    '--death-timeout',
                                    '60'],
                           'command': None,
                           'env': [{'name': 'DASK_SCHEDULER_ADDRESS',
                                    'value': 'tcp://192.168.10.123:59802',
                                    'value_from': None}],
                           'env_from': None,
                           'image': 'daskdev/dask:latest',
                           'image_pull_policy': 'IfNotPresent',
                           'lifecycle': None,
                           'liveness_probe': None,
                           'name': 'dask',
                           'ports': None,
                           'readiness_probe': None,
                           'resources': {'limits': {'cpu': '2', 'memory': '6G'},
                                         'requests': {'cpu': '2',
                                                      'memory': '6G'}},
                           'security_context': None,
                           'stdin': None,
                           'stdin_once': None,
                           'termination_message_path': '/dev/termination-log',
                           'termination_message_policy': 'File',
                           'tty': None,
                           'volume_devices': None,
                           'volume_mounts': [{'mount_path': '/var/run/secrets/kubernetes.io/serviceaccount',
                                              'mount_propagation': None,
                                              'name': 'default-token-7fxsz',
                                              'read_only': True,
                                              'sub_path': None}],
                           'working_dir': None}],
           'dns_config': None,
           'dns_policy': 'ClusterFirst',
           'host_aliases': None,
           'host_ipc': None,
           'host_network': None,
           'host_pid': None,
           'hostname': None,
           'image_pull_secrets': None,
           'init_containers': None,
           'node_name': None,
           'node_selector': None,
           'priority': None,
           'priority_class_name': None,
           'readiness_gates': None,
           'restart_policy': 'Never',
           'runtime_class_name': None,
           'scheduler_name': 'default-scheduler',
           'security_context': {'fs_group': None,
                                'run_as_group': None,
                                'run_as_non_root': None,
                                'run_as_user': None,
                                'se_linux_options': None,
                                'supplemental_groups': None,
                                'sysctls': None},
           'service_account': 'default',
           'service_account_name': 'default',
           'share_process_namespace': None,
           'subdomain': None,
           'termination_grace_period_seconds': 30,
           'tolerations': [{'effect': 'NoExecute',
                            'key': 'node.kubernetes.io/not-ready',
                            'operator': 'Exists',
                            'toleration_seconds': 300,
                            'value': None},
                           {'effect': 'NoExecute',
                            'key': 'node.kubernetes.io/unreachable',
                            'operator': 'Exists',
                            'toleration_seconds': 300,
                            'value': None}],
           'volumes': [{'aws_elastic_block_store': None,
                        'azure_disk': None,
                        'azure_file': None,
                        'cephfs': None,
                        'cinder': None,
                        'config_map': None,
                        'downward_api': None,
                        'empty_dir': None,
                        'fc': None,
                        'flex_volume': None,
                        'flocker': None,
                        'gce_persistent_disk': None,
                        'git_repo': None,
                        'glusterfs': None,
                        'host_path': None,
                        'iscsi': None,
                        'name': 'default-token-7fxsz',
                        'nfs': None,
                        'persistent_volume_claim': None,
                        'photon_persistent_disk': None,
                        'portworx_volume': None,
                        'projected': None,
                        'quobyte': None,
                        'rbd': None,
                        'scale_io': None,
                        'secret': {'default_mode': 420,
                                   'items': None,
                                   'optional': None,
                                   'secret_name': 'default-token-7fxsz'},
                        'storageos': None,
                        'vsphere_volume': None}]},
  'status': {'conditions': None,
             'container_statuses': None,
             'host_ip': None,
             'init_container_statuses': None,
             'message': None,
             'nominated_node_name': None,
             'phase': 'Pending',
             'pod_ip': None,
             'qos_class': 'Guaranteed',
             'reason': None,
             'start_time': None}}]

In [6]: client = Client(cluster)

In [7]: print(client.get_versions(check=True))
distributed.core - ERROR - Timed out trying to connect to 'tcp://10.1.0.105:37897' after 10 s: connect() didn't finish in time
Traceback (most recent call last):
  File "/Users/jjerphanion/.virtualenvs/dask_explo/lib/python3.7/site-packages/distributed/comm/core.py", line 204, in connect
    quiet_exceptions=EnvironmentError)
  File "/Users/jjerphanion/.virtualenvs/dask_explo/lib/python3.7/site-packages/tornado/gen.py", line 1133, in run
    value = future.result()
tornado.util.TimeoutError: Timeout

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/jjerphanion/.virtualenvs/dask_explo/lib/python3.7/site-packages/distributed/core.py", line 346, in handle_comm
    result = yield result
  File "/Users/jjerphanion/.virtualenvs/dask_explo/lib/python3.7/site-packages/tornado/gen.py", line 1133, in run
    value = future.result()
  File "/Users/jjerphanion/.virtualenvs/dask_explo/lib/python3.7/site-packages/tornado/gen.py", line 1141, in run
    yielded = self.gen.throw(*exc_info)
  File "/Users/jjerphanion/.virtualenvs/dask_explo/lib/python3.7/site-packages/distributed/scheduler.py", line 2446, in broadcast
    for address in addresses
  File "/Users/jjerphanion/.virtualenvs/dask_explo/lib/python3.7/site-packages/tornado/gen.py", line 1133, in run
    value = future.result()
  File "/Users/jjerphanion/.virtualenvs/dask_explo/lib/python3.7/site-packages/tornado/gen.py", line 1141, in run
    yielded = self.gen.throw(*exc_info)
  File "/Users/jjerphanion/.virtualenvs/dask_explo/lib/python3.7/site-packages/distributed/utils.py", line 214, in All
    result = yield tasks.next()
  File "/Users/jjerphanion/.virtualenvs/dask_explo/lib/python3.7/site-packages/tornado/gen.py", line 1133, in run
    value = future.result()
  File "/Users/jjerphanion/.virtualenvs/dask_explo/lib/python3.7/site-packages/tornado/gen.py", line 1141, in run
    yielded = self.gen.throw(*exc_info)
  File "/Users/jjerphanion/.virtualenvs/dask_explo/lib/python3.7/site-packages/distributed/scheduler.py", line 2441, in send_message
    connection_args=self.connection_args)
  File "/Users/jjerphanion/.virtualenvs/dask_explo/lib/python3.7/site-packages/tornado/gen.py", line 1133, in run
    value = future.result()
  File "/Users/jjerphanion/.virtualenvs/dask_explo/lib/python3.7/site-packages/tornado/gen.py", line 1141, in run
    yielded = self.gen.throw(*exc_info)
  File "/Users/jjerphanion/.virtualenvs/dask_explo/lib/python3.7/site-packages/distributed/comm/core.py", line 215, in connect
    _raise(error)
  File "/Users/jjerphanion/.virtualenvs/dask_explo/lib/python3.7/site-packages/distributed/comm/core.py", line 195, in _raise
    raise IOError(msg)
OSError: Timed out trying to connect to 'tcp://10.1.0.105:37897' after 10 s: connect() didn't finish in time
---------------------------------------------------------------------------
OSError                                   Traceback (most recent call last)
<ipython-input-7-43681d857331> in <module>
----> 1 print(client.get_versions(check=True))

~/.virtualenvs/dask_explo/lib/python3.7/site-packages/distributed/client.py in get_versions(self, check, packages)
   3232
   3233         workers = sync(self.loop, self.scheduler.broadcast,
-> 3234                        msg={'op': 'versions', 'packages': packages})
   3235         result = {'scheduler': scheduler, 'workers': workers, 'client': client}
   3236

~/.virtualenvs/dask_explo/lib/python3.7/site-packages/distributed/utils.py in sync(loop, func, *args, **kwargs)
    275             e.wait(10)
    276     if error[0]:
--> 277         six.reraise(*error[0])
    278     else:
    279         return result[0]

~/.virtualenvs/dask_explo/lib/python3.7/site-packages/six.py in reraise(tp, value, tb)
    691             if value.__traceback__ is not tb:
    692                 raise value.with_traceback(tb)
--> 693             raise value
    694         finally:
    695             value = None

~/.virtualenvs/dask_explo/lib/python3.7/site-packages/distributed/utils.py in f()
    260             if timeout is not None:
    261                 future = gen.with_timeout(timedelta(seconds=timeout), future)
--> 262             result[0] = yield future
    263         except Exception as exc:
    264             error[0] = sys.exc_info()

~/.virtualenvs/dask_explo/lib/python3.7/site-packages/tornado/gen.py in run(self)
   1131
   1132                     try:
-> 1133                         value = future.result()
   1134                     except Exception:
   1135                         self.had_exception = True

~/.virtualenvs/dask_explo/lib/python3.7/site-packages/tornado/gen.py in run(self)
   1139                     if exc_info is not None:
   1140                         try:
-> 1141                             yielded = self.gen.throw(*exc_info)
   1142                         finally:
   1143                             # Break up a reference to itself

~/.virtualenvs/dask_explo/lib/python3.7/site-packages/distributed/core.py in send_recv_from_rpc(**kwargs)
    578             try:
    579                 comm = yield self.live_comm()
--> 580                 result = yield send_recv(comm=comm, op=key, **kwargs)
    581             except (RPCClosed, CommClosedError) as e:
    582                 raise e.__class__("%s: while trying to call remote method %r"

~/.virtualenvs/dask_explo/lib/python3.7/site-packages/tornado/gen.py in run(self)
   1131
   1132                     try:
-> 1133                         value = future.result()
   1134                     except Exception:
   1135                         self.had_exception = True

~/.virtualenvs/dask_explo/lib/python3.7/site-packages/tornado/gen.py in run(self)
   1145                             exc_info = None
   1146                     else:
-> 1147                         yielded = self.gen.send(value)
   1148
   1149                     if stack_context._state.contexts is not orig_stack_contexts:

~/.virtualenvs/dask_explo/lib/python3.7/site-packages/distributed/core.py in send_recv(comm, reply, serializers, deserializers, **kwargs)
    470     if isinstance(response, dict) and response.get('status') == 'uncaught-error':
    471         if comm.deserialize:
--> 472             six.reraise(*clean_exception(**response))
    473         else:
    474             raise Exception(response['text'])

~/.virtualenvs/dask_explo/lib/python3.7/site-packages/six.py in reraise(tp, value, tb)
    690                 value = tp()
    691             if value.__traceback__ is not tb:
--> 692                 raise value.with_traceback(tb)
    693             raise value
    694         finally:

~/.virtualenvs/dask_explo/lib/python3.7/site-packages/distributed/core.py in handle_comm()
    344                         if type(result) is gen.Future:
    345                             self._ongoing_coroutines.add(result)
--> 346                             result = yield result
    347                     except (CommClosedError, CancelledError) as e:
    348                         if self.status == 'running':

~/.virtualenvs/dask_explo/lib/python3.7/site-packages/tornado/gen.py in run()
   1131
   1132                     try:
-> 1133                         value = future.result()
   1134                     except Exception:
   1135                         self.had_exception = True

~/.virtualenvs/dask_explo/lib/python3.7/site-packages/tornado/gen.py in run()
   1139                     if exc_info is not None:
   1140                         try:
-> 1141                             yielded = self.gen.throw(*exc_info)
   1142                         finally:
   1143                             # Break up a reference to itself

~/.virtualenvs/dask_explo/lib/python3.7/site-packages/distributed/scheduler.py in broadcast()
   2444
   2445         results = yield All([send_message(address)
-> 2446                              for address in addresses
   2447                              if address is not None])
   2448

~/.virtualenvs/dask_explo/lib/python3.7/site-packages/tornado/gen.py in run()
   1131
   1132                     try:
-> 1133                         value = future.result()
   1134                     except Exception:
   1135                         self.had_exception = True

~/.virtualenvs/dask_explo/lib/python3.7/site-packages/tornado/gen.py in run()
   1139                     if exc_info is not None:
   1140                         try:
-> 1141                             yielded = self.gen.throw(*exc_info)
   1142                         finally:
   1143                             # Break up a reference to itself

~/.virtualenvs/dask_explo/lib/python3.7/site-packages/distributed/utils.py in All()
    212     while not tasks.done():
    213         try:
--> 214             result = yield tasks.next()
    215         except Exception:
    216             @gen.coroutine

~/.virtualenvs/dask_explo/lib/python3.7/site-packages/tornado/gen.py in run()
   1131
   1132                     try:
-> 1133                         value = future.result()
   1134                     except Exception:
   1135                         self.had_exception = True

~/.virtualenvs/dask_explo/lib/python3.7/site-packages/tornado/gen.py in run()
   1139                     if exc_info is not None:
   1140                         try:
-> 1141                             yielded = self.gen.throw(*exc_info)
   1142                         finally:
   1143                             # Break up a reference to itself

~/.virtualenvs/dask_explo/lib/python3.7/site-packages/distributed/scheduler.py in send_message()
   2439         def send_message(addr):
   2440             comm = yield connect(addr, deserialize=self.deserialize,
-> 2441                                  connection_args=self.connection_args)
   2442             resp = yield send_recv(comm, close=True, serializers=serializers, **msg)
   2443             raise gen.Return(resp)

~/.virtualenvs/dask_explo/lib/python3.7/site-packages/tornado/gen.py in run()
   1131
   1132                     try:
-> 1133                         value = future.result()
   1134                     except Exception:
   1135                         self.had_exception = True

~/.virtualenvs/dask_explo/lib/python3.7/site-packages/tornado/gen.py in run()
   1139                     if exc_info is not None:
   1140                         try:
-> 1141                             yielded = self.gen.throw(*exc_info)
   1142                         finally:
   1143                             # Break up a reference to itself

~/.virtualenvs/dask_explo/lib/python3.7/site-packages/distributed/comm/core.py in connect()
    213                 _raise(error)
    214         except gen.TimeoutError:
--> 215             _raise(error)
    216         else:
    217             break

~/.virtualenvs/dask_explo/lib/python3.7/site-packages/distributed/comm/core.py in _raise()
    193         msg = ("Timed out trying to connect to %r after %s s: %s"
    194                % (addr, timeout, error))
--> 195         raise IOError(msg)
    196
    197     # This starts a thread

OSError: Timed out trying to connect to 'tcp://10.1.0.105:37897' after 10 s: connect() didn't finish in time

@mrocklin
Copy link
Member

mrocklin commented Mar 6, 2019

Yeah, I suspect that you just can't connect properly. My guess is that wherever you're running your KubeCluster is unable to open up connections to your workers.

Note that today dask-kubernetes is typically run from within the Kubernetes cluster. You may want to track #84

@jjerphan
Copy link
Author

jjerphan commented Mar 6, 2019

By directly running a python interpreter on the container:

Python 3.7.2 (default, Dec 29 2018, 06:19:36)
[GCC 7.3.0] :: Anaconda, Inc. on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> from distributed.versions import get_versions
>>> get_versions()
{'host': [('python', '3.7.2.final.0'), 
('python-bits', 64),
('OS', 'Linux'),
('OS-release', '4.9.125-linuxkit'),
('machine', 'x86_64'),
('processor', ''),
('byteorder', 'little'),
('LC_ALL', 'C.UTF-8'),
('LANG', 'C.UTF-8'),
('LOCALE', 'en_US.UTF-8')],
'packages': 
{'required': [('dask', '1.1.2'), ('distributed', '1.26.0'), ('msgpack', '0.6.1'), 
('cloudpickle', '0.8.0'), ('tornado', '5.1.1'), ('toolz', '0.9.0')], 
'optional': [('numpy', '1.16.0'), ('pandas', '0.24.0'), ('bokeh', '1.0.4'),
('lz4', None), ('dask_ml', None), ('blosc', '1.7.0')]}}

vs locally on my machine:

Python 3.7.1 (default, Dec 14 2018, 13:28:58)
Type 'copyright', 'credits' or 'license' for more information
IPython 7.3.0 -- An enhanced Interactive Python. Type '?' for help.

In [1]: from distributed.versions import get_versions

In [2]: get_versions()
Out[2]:
{'host': [('python', '3.7.1.final.0'),
  ('python-bits', 64),
  ('OS', 'Darwin'),
  ('OS-release', '18.2.0'),
  ('machine', 'x86_64'),
  ('processor', 'i386'),
  ('byteorder', 'little'),
  ('LC_ALL', 'en_US.UTF-8'),
  ('LANG', 'en_US.UTF-8'),
  ('LOCALE', 'en_US.UTF-8')],
 'packages': {'required': [('dask', '1.1.2'),
   ('distributed', '1.26.0'),
   ('msgpack', '0.6.1'),
   ('cloudpickle', '0.8.0'),
   ('tornado', '5.1.1'),
   ('toolz', '0.9.0')],
  'optional': [('numpy', '1.16.2'),
   ('pandas', '0.23.4'),
   ('bokeh', '1.0.4'),
   ('lz4', None),
   ('dask_ml', None),
   ('blosc', None)]}}

@jjerphan
Copy link
Author

jjerphan commented Mar 6, 2019

Yeah, I suspect that you just can't connect properly. My guess is that wherever you're running your KubeCluster is unable to open up connections to your workers.

Yeah, but jobs seem to be executed on the worker when running compute.

@mrocklin
Copy link
Member

mrocklin commented Mar 6, 2019

Yes, connections are necessary both ways. Your workers can contact the scheduler, but the scheduler can't contact the workers. Different parts of the operation will work and not work depending on which kind of connection they rely on.

In short, today we need the KubeCluster object to run on the Kubernetes cluster, or at least have full network access to all pods running on that cluster.

In the moderate future we'll extend dask-kubernetes to launch the scheduler on the cluster. At that point you'll be able to connect a client that's outside the cluster effectively. You should watch the issue pointed to above. Folks are working on this now. cc @beberg

@jjerphan
Copy link
Author

jjerphan commented Mar 6, 2019

OK I see. Thanks, I will watch that and eventually study this if I can.

@jjerphan
Copy link
Author

What I've done to fix the problem locally and temporary is:

@ncdejito
Copy link

@jjerphan would you mind sharing more details on your fix?

  • to port forward worker's pods - can be any port?
  • to add a route for each worker's private IP - is this through the command iptables? is there a way to programatically do this? I'm trying to do "kubectl logs -f", get the worker ip, and add route to localhost (scheduler) via iptables, quickly before the pod times out trying to connect, and I race through all pods. is this the right way to do this?

@jacobtomlinson
Copy link
Member

@ncdejito the comment you are referring to is over 12 months old. So you may not get an answer here.

Are you experiencing a similar problem? You may want to open a new issue with a reproducible example so we can help you with it.

@jjerphan
Copy link
Author

Hi @ncdejito.

Unfortunately, I don't remember and do not have accessed to the code any more.

I approve @jacobtomlinson's suggestion.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants