diff --git a/ansible/README.md b/ansible/README.md index ef458ae9d90..6865c6bb7c6 100644 --- a/ansible/README.md +++ b/ansible/README.md @@ -148,6 +148,58 @@ ansible-playbook -i environments/$ENVIRONMENT prereq.yml **Hint:** During playbook execution the `TASK [prereq : check for pip]` can show as failed. This is normal if no pip is installed. The playbook will then move on and install pip on the target machines. +### [Optional] Enable the new scheduler + +You can enable the new scheduler of OpenWhisk. +It will run one more component called "scheduler" and ETCD. + +#### Configure service providers for the scheduler +You can update service providers for the scheduler as follows. + +**common/scala/src/main/resources** +``` +whisk.spi { + ArtifactStoreProvider = org.apache.openwhisk.core.database.CouchDbStoreProvider + ActivationStoreProvider = org.apache.openwhisk.core.database.ArtifactActivationStoreProvider + MessagingProvider = org.apache.openwhisk.connector.kafka.KafkaMessagingProvider + ContainerFactoryProvider = org.apache.openwhisk.core.containerpool.docker.DockerContainerFactoryProvider + LogStoreProvider = org.apache.openwhisk.core.containerpool.logging.DockerToActivationLogStoreProvider + LoadBalancerProvider = org.apache.openwhisk.core.loadBalancer.FPCPoolBalancer + EntitlementSpiProvider = org.apache.openwhisk.core.entitlement.FPCEntitlementProvider + AuthenticationDirectiveProvider = org.apache.openwhisk.core.controller.BasicAuthenticationDirective + InvokerProvider = org.apache.openwhisk.core.invoker.FPCInvokerReactive + InvokerServerProvider = org.apache.openwhisk.core.invoker.FPCInvokerServer + DurationCheckerProvider = org.apache.openwhisk.core.scheduler.queue.ElasticSearchDurationCheckerProvider +} +. +. +. +``` + +#### Enable the scheduler +- Make sure you enable the scheduler by configuring `scheduler_enable`. + +**ansible/environments/local/group_vars** +```yaml +scheduler_enable: true +``` + +#### [Optional] Enable ElasticSearch Activation Store +When you use the new scheduler, it is recommended to use ElasticSearch as an activation store. + +**ansible/environments/local/group_vars** +```yaml +db_activation_backend: ElasticSearch +elastic_cluster_name: +elastic_protocol: +elastic_index_pattern: +elastic_base_volume: +elastic_username: +elastic_password: +``` + +You can also refer to this guide to [deploy OpenWhisk using ElasticSearch](https://github.com/apache/openwhisk/blob/master/ansible/README.md#using-elasticsearch-to-store-activations). + ### Deploying Using CouchDB - Make sure your `db_local.ini` file is [setup for](#setup) CouchDB then execute: diff --git a/ansible/environments/local/hosts.j2.ini b/ansible/environments/local/hosts.j2.ini index cceb7061f9e..26fce32d8b5 100644 --- a/ansible/environments/local/hosts.j2.ini +++ b/ansible/environments/local/hosts.j2.ini @@ -27,6 +27,12 @@ invoker0 ansible_host=172.17.0.1 ansible_connection=local invoker1 ansible_host=172.17.0.1 ansible_connection=local {% endif %} +[schedulers] +scheduler0 ansible_host=172.17.0.1 ansible_connection=local +{% if mode is defined and 'HA' in mode %} +scheduler1 ansible_host=172.17.0.1 ansible_connection=local +{% endif %} + ; db group is only used if db.provider is CouchDB [db] 172.17.0.1 ansible_host=172.17.0.1 ansible_connection=local diff --git a/ansible/group_vars/all b/ansible/group_vars/all index 0f9b107b8d5..79d7eafef1a 100644 --- a/ansible/group_vars/all +++ b/ansible/group_vars/all @@ -127,6 +127,8 @@ jmx: rmiBasePortController: 16000 basePortInvoker: 17000 rmiBasePortInvoker: 18000 + basePortScheduler: 21000 + rmiBasePortScheduler: 22000 user: "{{ jmxuser | default('jmxuser') }}" pass: "{{ jmxuser | default('jmxpass') }}" jvmCommonArgs: "-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.local.only=false -Dcom.sun.management.jmxremote.authenticate=true -Dcom.sun.management.jmxremote.password.file=/home/owuser/jmxremote.password -Dcom.sun.management.jmxremote.access.file=/home/owuser/jmxremote.access" @@ -221,6 +223,8 @@ invoker: keystore: password: "{{ invoker_keystore_password | default('openwhisk') }}" name: "{{ __invoker_ssl_keyPrefix }}openwhisk-keystore.p12" + container: + creationMaxPeek: "{{ container_creation_max_peek | default(500) }}" reactiveSpi: "{{ invokerReactive_spi | default('') }}" serverSpi: "{{ invokerServer_spi | default('') }}" @@ -278,6 +282,9 @@ db: invoker: user: "{{ db_invoker_user | default(lookup('ini', 'db_username section=invoker file={{ playbook_dir }}/db_local.ini')) }}" pass: "{{ db_invoker_pass | default(lookup('ini', 'db_password section=invoker file={{ playbook_dir }}/db_local.ini')) }}" + scheduler: + user: "{{ db_scheduler_user | default(lookup('ini', 'db_username section=scheduler file={{ playbook_dir }}/db_local.ini')) }}" + pass: "{{ db_scheduler_pass | default(lookup('ini', 'db_password section=scheduler file={{ playbook_dir }}/db_local.ini')) }}" artifact_store: backend: "{{ db_artifact_backend | default('CouchDB') }}" activation_store: @@ -435,8 +442,9 @@ metrics: user_events: "{{ user_events_enabled | default(false) | lower }}" -durationChecker: - timeWindow: "{{ duration_checker_time_window | default('1 d') }}" +zeroDowntimeDeployment: + enabled: "{{ zerodowntime_deployment_switch | default(true) }}" + solution: "{{ zerodowntime_deployment_solution | default('apicall') }}" etcd: version: "{{ etcd_version | default('v3.4.0') }}" @@ -463,13 +471,63 @@ etcd_connect_string: "{% set ret = [] %}\ {% endfor %}\ {{ ret | join(',') }}" + +__scheduler_blackbox_fraction: 0.10 + +watcher: + eventNotificationDelayMs: "{{ watcher_notification_delay | default('5000 ms') }}" + +durationChecker: + timeWindow: "{{ duration_checker_time_window | default('1 d') }}" + +enable_scheduler: "{{ scheduler_enable | default(false) }}" + scheduler: protocol: "{{ scheduler_protocol | default('http') }}" + dir: + become: "{{ scheduler_dir_become | default(false) }}" + confdir: "{{ config_root_dir }}/scheduler" + basePort: 14001 grpc: + basePort: 13001 tls: "{{ scheduler_grpc_tls | default(false) }}" maxPeek: "{{ scheduler_max_peek | default(128) }}" + heap: "{{ scheduler_heap | default('2g') }}" + arguments: "{{ scheduler_arguments | default('') }}" + instances: "{{ groups['schedulers'] | length }}" + username: "{{ scheduler_username | default('scheduler.user') }}" + password: "{{ scheduler_password | default('scheduler.pass') }}" + akka: + provider: cluster + cluster: + basePort: 25520 + host: "{{ groups['schedulers'] | map('extract', hostvars, 'ansible_host') | list }}" + bindPort: 3551 + # at this moment all schedulers are seed nodes + seedNodes: "{{ groups['schedulers'] | map('extract', hostvars, 'ansible_host') | list }}" + loglevel: "{{ scheduler_loglevel | default(whisk_loglevel) | default('INFO') }}" + extraEnv: "{{ scheduler_extraEnv | default({}) }}" + dataManagementService: + retryInterval: "{{ scheduler_dataManagementService_retryInterval | default('1 second') }}" + inProgressJobRetentionSecond: "{{ scheduler_inProgressJobRetentionSecond | default('20 seconds') }}" + managedFraction: "{{ scheduler_managed_fraction | default(1.0 - (scheduler_blackbox_fraction | default(__scheduler_blackbox_fraction))) }}" + blackboxFraction: "{{ scheduler_blackbox_fraction | default(__scheduler_blackbox_fraction) }}" queueManager: maxSchedulingTime: "{{ scheduler_maxSchedulingTime | default('20 second') }}" maxRetriesToGetQueue: "{{ scheduler_maxRetriesToGetQueue | default(13) }}" + queue: + # the queue's state Running timeout, e.g. if have no activation comes into queue when Running, the queue state will be changed from Running to Idle and delete the decision algorithm actor + idleGrace: "{{ scheduler_queue_idleGrace | default('20 seconds') }}" + # the queue's state Idle timeout, e.g. if have no activation comes into queue when Idle, the queue state will be changed from Idle to Removed + stopGrace: "{{ scheduler_queue_stopGrace | default('20 seconds') }}" + # the queue's state Paused timeout, e.g. if have no activation comes into queue when Paused, the queue state will be changed from Paused to Removed + flushGrace: "{{ scheduler_queue_flushGrace | default('60 seconds') }}" + gracefulShutdownTimeout: "{{ scheduler_queue_gracefulShutdownTimeout | default('5 seconds') }}" + maxRetentionSize: "{{ scheduler_queue_maxRetentionSize | default(10000) }}" + maxRetentionMs: "{{ scheduler_queue_maxRetentionMs | default(60000) }}" + maxBlackboxRetentionMs: "{{ scheduler_queue_maxBlackboxRetentionMs | default(300000) }}" + throttlingFraction: "{{ scheduler_queue_throttlingFraction | default(0.9) }}" + durationBufferSize: "{{ scheduler_queue_durationBufferSize | default(10) }}" + deployment_ignore_error: "{{ scheduler_deployment_ignore_error | default('False') }}" dataManagementService: retryInterval: "{{ scheduler_dataManagementService_retryInterval | default('1 second') }}" diff --git a/ansible/openwhisk.yml b/ansible/openwhisk.yml index 79d0b4d751f..f832b62fc25 100644 --- a/ansible/openwhisk.yml +++ b/ansible/openwhisk.yml @@ -20,12 +20,17 @@ # playbook (currently cloudant.yml or couchdb.yml). # It assumes that wipe.yml have being deployed at least once. +- import_playbook: etcd.yml + when: enable_scheduler - import_playbook: kafka.yml when: not lean - import_playbook: controller.yml +- import_playbook: scheduler.yml + when: enable_scheduler + - import_playbook: invoker.yml when: not lean diff --git a/ansible/roles/controller/tasks/deploy.yml b/ansible/roles/controller/tasks/deploy.yml index 931d6576d50..8c99dc9403a 100644 --- a/ansible/roles/controller/tasks/deploy.yml +++ b/ansible/roles/controller/tasks/deploy.yml @@ -309,6 +309,21 @@ env: "{{ env | combine(mongodb_env) }}" when: db.artifact_store.backend == "MongoDB" +- name: setup scheduler env + set_fact: + scheduler_env: + "CONFIG_whisk_etcd_hosts": "{{ etcd_connect_string }}" + "CONFIG_whisk_etcd_lease_timeout": "{{ etcd.lease.timeout }}" + "CONFIG_whisk_etcd_pool_threads": "{{ etcd.pool_threads }}" + "CONFIG_whisk_scheduler_grpc_tls": "{{ scheduler.grpc.tls | default('false') | lower }}" + "CONFIG_whisk_scheduler_maxPeek": "{{ scheduler.maxPeek }}" + when: enable_scheduler + +- name: merge scheduler env + set_fact: + env: "{{ env | combine(scheduler_env) }}" + when: enable_scheduler + - name: populate volumes for controller set_fact: controller_volumes: diff --git a/ansible/roles/invoker/tasks/deploy.yml b/ansible/roles/invoker/tasks/deploy.yml index 28d8ead3796..fd837ce3e6e 100644 --- a/ansible/roles/invoker/tasks/deploy.yml +++ b/ansible/roles/invoker/tasks/deploy.yml @@ -328,6 +328,21 @@ env: "{{ env | combine(mongodb_env) }}" when: db.artifact_store.backend == "MongoDB" +- name: setup scheduler env + set_fact: + scheduler_env: + "CONFIG_whisk_etcd_hosts": "{{ etcd_connect_string }}" + "CONFIG_whisk_etcd_lease_timeout": "{{ etcd.lease.timeout }}" + "CONFIG_whisk_etcd_pool_threads": "{{ etcd.pool_threads }}" + "CONFIG_whisk_scheduler_dataManagementService_retryInterval": "{{ scheduler.dataManagementService.retryInterval }}" + "CONFIG_whisk_invoker_containerCreation_maxPeek": "{{ invoker.container.creationMaxPeek }}" + when: enable_scheduler + +- name: merge scheduler env + set_fact: + env: "{{ env | combine(scheduler_env) }}" + when: enable_scheduler + - name: include plugins include_tasks: "{{ inv_item }}.yml" with_items: "{{ invoker_plugins | default([]) }}" diff --git a/ansible/roles/schedulers/tasks/clean.yml b/ansible/roles/schedulers/tasks/clean.yml new file mode 100644 index 00000000000..20bb99633d6 --- /dev/null +++ b/ansible/roles/schedulers/tasks/clean.yml @@ -0,0 +1,24 @@ +--- +# Remove scheduler containers. + +- name: get scheduler name + set_fact: + scheduler_name: "{{ name_prefix ~ host_group.index(inventory_hostname) }}" + +- name: remove scheduler + docker_container: + name: "{{ scheduler_name }}" + state: absent + ignore_errors: "True" + +- name: remove scheduler log directory + file: + path: "{{ whisk_logs_dir }}/{{ scheduler_name }}" + state: absent + become: "{{ logs.dir.become }}" + +- name: remove scheduler conf directory + file: + path: "{{ scheduler.confdir }}/{{ scheduler_name }}" + state: absent + become: "{{ scheduler.dir.become }}" diff --git a/ansible/roles/schedulers/tasks/deploy.yml b/ansible/roles/schedulers/tasks/deploy.yml new file mode 100644 index 00000000000..7a976654e59 --- /dev/null +++ b/ansible/roles/schedulers/tasks/deploy.yml @@ -0,0 +1,339 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more contributor +# license agreements; and to You under the Apache License, Version 2.0. +--- +# This role will install Scheduler in group 'schedulers' in the environment +# inventory + +- import_tasks: docker_login.yml + +- name: get scheduler name and index + set_fact: + scheduler_name: "{{ name_prefix ~ host_group.index(inventory_hostname) }}" + scheduler_index: + "{{ (scheduler_index_base|int) + host_group.index(inventory_hostname) }}" + +- name: "pull the {{ docker.image.tag }} image of scheduler" + shell: "docker pull {{docker_registry}}{{ docker.image.prefix }}/scheduler:{{docker.image.tag}}" + when: docker_registry != "" + register: result + until: (result.rc == 0) + retries: "{{ docker.pull.retries }}" + delay: "{{ docker.pull.delay }}" + +- name: ensure scheduler log directory is created with permissions + file: + path: "{{ whisk_logs_dir }}/{{ scheduler_name }}" + state: directory + mode: 0777 + become: "{{ logs.dir.become }}" + +# We need to create the file with proper permissions because the dir creation above +# does not result in a dir with full permissions in docker machine especially with macos mounts +- name: ensure scheduler log file is created with permissions + file: + path: "{{ whisk_logs_dir }}/{{ scheduler_name }}/{{ scheduler_name }}_logs.log" + state: touch + mode: 0777 + when: environment_type is defined and environment_type == "docker-machine" + +- name: ensure scheduler config directory is created with permissions + file: + path: "{{ scheduler.confdir }}/{{ scheduler_name }}" + state: directory + mode: 0777 + become: "{{ scheduler.dir.become }}" + +- name: check, that required databases exist + include_tasks: "{{ openwhisk_home }}/ansible/tasks/db/checkDb.yml" + vars: + dbName: "{{ item }}" + dbUser: "{{ db.credentials.scheduler.user }}" + dbPass: "{{ db.credentials.scheduler.pass }}" + with_items: + - "{{ db.whisk.auth }}" + +- name: copy jmxremote password file + when: jmx.enabled + template: + src: "jmxremote.password.j2" + dest: "{{ scheduler.confdir }}/{{ scheduler_name }}/jmxremote.password" + mode: 0777 + +- name: copy jmxremote access file + when: jmx.enabled + template: + src: "jmxremote.access.j2" + dest: "{{ scheduler.confdir }}/{{ scheduler_name }}/jmxremote.access" + mode: 0777 + +- name: prepare scheduler port + set_fact: + scheduler_port: "{{ scheduler.basePort + (scheduler_index | int) }}" + ports_to_expose: + - "{{ scheduler.grpc.basePort + (scheduler_index | int) }}:{{ scheduler.grpc.basePort + (scheduler_index | int) }}" + - "{{ scheduler.basePort + (scheduler_index | int) }}:8080" + +- name: expose additional ports if jmxremote is enabled + when: jmx.enabled + vars: + jmx_remote_port: "{{ jmx.basePortScheduler + (scheduler_index|int) }}" + jmx_remote_rmi_port: + "{{ jmx.rmiBasePortScheduler + (scheduler_index|int) }}" + set_fact: + ports_to_expose: >- + {{ ports_to_expose }} + + [ '{{ jmx_remote_port }}:{{ jmx_remote_port }}' ] + + [ '{{ jmx_remote_rmi_port }}:{{ jmx_remote_rmi_port }}' ] + scheduler_args: >- + {{ scheduler.arguments }} + {{ jmx.jvmCommonArgs }} + -Djava.rmi.server.hostname={{ ansible_host }} + -Dcom.sun.management.jmxremote.rmi.port={{ jmx_remote_rmi_port }} + -Dcom.sun.management.jmxremote.port={{ jmx_remote_port }} + +- name: populate environment variables for scheduler + set_fact: + env: + "JAVA_OPTS": + -Xmx{{ scheduler.heap }} + -XX:+CrashOnOutOfMemoryError + -XX:+UseGCOverheadLimit + -XX:ErrorFile=/logs/java_error.log + -XX:+HeapDumpOnOutOfMemoryError + -XX:HeapDumpPath=/logs + "SCHEDULER_OPTS": "{{ scheduler_args | default(scheduler.arguments) }}" + "SCHEDULER_INSTANCES": "{{ scheduler.instances }}" + "JMX_REMOTE": "{{ jmx.enabled }}" + "PORT": "8080" + + "WHISK_SCHEDULER_ENDPOINTS_HOST": "{{ ansible_host }}" + "WHISK_SCHEDULER_ENDPOINTS_RPCPORT": "{{ scheduler.grpc.basePort + (scheduler_index | int)}}" + "WHISK_SCHEDULER_ENDPOINTS_AKKAPORT": "{{ scheduler.akka.cluster.basePort + (scheduler_index | int) }}" + "CONFIG_whisk_scheduler_protocol": "{{ scheduler.protocol }}" + "CONFIG_whisk_scheduler_maxPeek": "{{ scheduler.maxPeek }}" + "CONFIG_whisk_scheduler_dataManagementService_retryInterval": "{{ scheduler.dataManagementService.retryInterval }}" + "CONFIG_whisk_scheduler_inProgressJobRetention": "{{ scheduler.inProgressJobRetentionSecond }}" + "CONFIG_whisk_scheduler_queueManager_maxSchedulingTime": "{{ scheduler.queueManager.maxSchedulingTime }}" + "CONFIG_whisk_scheduler_queueManager_maxRetriesToGetQueue": "{{ scheduler.queueManager.maxRetriesToGetQueue }}" + "CONFIG_whisk_scheduler_queue_idleGrace": "{{ scheduler.queue.idleGrace }}" + "CONFIG_whisk_scheduler_queue_stopGrace": "{{ scheduler.queue.stopGrace }}" + "CONFIG_whisk_scheduler_queue_flushGrace": "{{ scheduler.queue.flushGrace }}" + "CONFIG_whisk_scheduler_queue_gracefulShutdownTimeout": "{{ scheduler.queue.gracefulShutdownTimeout }}" + "CONFIG_whisk_scheduler_queue_maxRetentionSize": "{{ scheduler.queue.maxRetentionSize }}" + "CONFIG_whisk_scheduler_queue_maxRetentionMs": "{{ scheduler.queue.maxRetentionMs }}" + "CONFIG_whisk_scheduler_queue_throttlingFraction": "{{ scheduler.queue.throttlingFraction }}" + "CONFIG_whisk_scheduler_queue_durationBufferSize": "{{ scheduler.queue.durationBufferSize }}" + "CONFIG_whisk_durationChecker_timeWindow": "{{ durationChecker.timeWindow }}" + + "TZ": "{{ docker.timezone }}" + + "KAFKA_HOSTS": "{{ kafka_connect_string }}" + "CONFIG_whisk_kafka_replicationFactor": + "{{ kafka.replicationFactor | default() }}" + "CONFIG_whisk_kafka_topics_scheduler_retentionBytes": + "{{ kafka_topics_scheduler_retentionBytes | default() }}" + "CONFIG_whisk_kafka_topics_scheduler_retentionMs": + "{{ kafka_topics_scheduler_retentionMS | default() }}" + "CONFIG_whisk_kafka_topics_scheduler_segmentBytes": + "{{ kafka_topics_scheduler_segmentBytes | default() }}" + "CONFIG_whisk_kafka_topics_creationAck_retentionBytes": + "{{ kafka_topics_creationAck_retentionBytes | default() }}" + "CONFIG_whisk_kafka_topics_creationAck_retentionMs": + "{{ kafka_topics_creationAck_retentionMS | default() }}" + "CONFIG_whisk_kafka_topics_creationAck_segmentBytes": + "{{ kafka_topics_creationAck_segmentBytes | default() }}" + "CONFIG_whisk_kafka_topics_prefix": + "{{ kafka.topicsPrefix }}" + "CONFIG_whisk_kafka_topics_userEvent_prefix": + "{{ kafka.topicsUserEventPrefix }}" + "CONFIG_whisk_kafka_common_securityProtocol": + "{{ kafka.protocol }}" + "CONFIG_whisk_kafka_common_sslTruststoreLocation": + "/conf/{{ kafka.ssl.keystore.name }}" + "CONFIG_whisk_kafka_common_sslTruststorePassword": + "{{ kafka.ssl.keystore.password }}" + "CONFIG_whisk_kafka_common_sslKeystoreLocation": + "/conf/{{ kafka.ssl.keystore.name }}" + "CONFIG_whisk_kafka_common_sslKeystorePassword": + "{{ kafka.ssl.keystore.password }}" + "ZOOKEEPER_HOSTS": "{{ zookeeper_connect_string }}" + + "LIMITS_ACTIONS_INVOKES_CONCURRENT": "{{ limits.concurrentInvocations }}" + + "CONFIG_whisk_couchdb_protocol": "{{ db.protocol }}" + "CONFIG_whisk_couchdb_host": "{{ db.host }}" + "CONFIG_whisk_couchdb_port": "{{ db.port }}" + "CONFIG_whisk_couchdb_username": "{{ db.credentials.scheduler.user }}" + "CONFIG_whisk_couchdb_password": "{{ db.credentials.scheduler.pass }}" + "CONFIG_whisk_couchdb_provider": "{{ db.provider }}" + "CONFIG_whisk_couchdb_databases_WhiskAuth": "{{ db.whisk.auth }}" + "CONFIG_whisk_couchdb_databases_WhiskEntity": "{{ db.whisk.actions }}" + "CONFIG_whisk_couchdb_databases_WhiskActivation": "{{ db.whisk.activations }}" + "CONFIG_whisk_db_actionsDdoc": "{{ db_whisk_actions_ddoc | default() }}" + "CONFIG_whisk_db_activationsDdoc": "{{ db_whisk_activations_ddoc | default() }}" + "CONFIG_whisk_db_activationsFilterDdoc": "{{ db_whisk_activations_filter_ddoc | default() }}" + "CONFIG_whisk_userEvents_enabled": "{{ user_events | default(false) | lower }}" + + "CONFIG_whisk_memory_min": "{{ limit_action_memory_min | default() }}" + "CONFIG_whisk_memory_max": "{{ limit_action_memory_max | default() }}" + "CONFIG_whisk_memory_std": "{{ limit_action_memory_std | default() }}" + + "CONFIG_whisk_timeLimit_min": "{{ limit_action_time_min | default() }}" + "CONFIG_whisk_timeLimit_max": "{{ limit_action_time_max | default() }}" + "CONFIG_whisk_timeLimit_std": "{{ limit_action_time_std | default() }}" + + "RUNTIMES_MANIFEST": "{{ runtimesManifest | to_json }}" + "CONFIG_whisk_runtimes_defaultImagePrefix": + "{{ runtimes_default_image_prefix | default() }}" + "CONFIG_whisk_runtimes_defaultImageTag": + "{{ runtimes_default_image_tag | default() }}" + "CONFIG_whisk_runtimes_bypassPullForLocalImages": + "{{ runtimes_bypass_pull_for_local_images | default() | lower }}" + "CONFIG_whisk_runtimes_localImagePrefix": + "{{ runtimes_local_image_prefix | default() }}" + + "METRICS_KAMON": "{{ metrics.kamon.enabled | default(false) | lower }}" + "METRICS_KAMON_TAGS": "{{ metrics.kamon.tags | default() | lower }}" + "METRICS_LOG": "{{ metrics.log.enabled | default(false) | lower }}" + + "CONFIG_kamon_statsd_hostname": "{{ metrics.kamon.host }}" + "CONFIG_kamon_statsd_port": "{{ metrics.kamon.port }}" + + "CONFIG_whisk_fraction_managedFraction": + "{{ scheduler.managedFraction }}" + "CONFIG_whisk_fraction_blackboxFraction": + "{{ scheduler.blackboxFraction }}" + + "CONFIG_logback_log_level": "{{ scheduler.loglevel }}" + + "CONFIG_whisk_transactions_header": "{{ transactions.header }}" + + "CONFIG_whisk_etcd_hosts": "{{ etcd_connect_string }}" + "CONFIG_whisk_etcd_lease_timeout": "{{ etcd.lease.timeout }}" + "CONFIG_whisk_etcd_pool_threads": "{{ etcd.pool_threads }}" + "CONFIG_whisk_cluster_name": "{{ whisk.cluster_name | lower }}" + + "CONFIG_whisk_scheduler_username": "{{ scheduler.username }}" + "CONFIG_whisk_scheduler_password": "{{ scheduler.password }}" + + +- name: merge extra env variables + set_fact: + env: "{{ env | combine(scheduler.extraEnv) }}" + +- name: populate volumes for scheduler + set_fact: + scheduler_volumes: + - "{{ whisk_logs_dir }}/{{ scheduler_name }}:/logs" + - "{{ scheduler.confdir }}/{{ scheduler_name }}:/conf" + +- name: setup elasticsearch activation store env + set_fact: + elastic_env: + "CONFIG_whisk_activationStore_elasticsearch_protocol": "{{ db.elasticsearch.protocol}}" + "CONFIG_whisk_activationStore_elasticsearch_hosts": "{{ elasticsearch_connect_string }}" + "CONFIG_whisk_activationStore_elasticsearch_indexPattern": "{{ db.elasticsearch.index_pattern }}" + "CONFIG_whisk_activationStore_elasticsearch_username": "{{ db.elasticsearch.auth.admin.username }}" + "CONFIG_whisk_activationStore_elasticsearch_password": "{{ db.elasticsearch.auth.admin.password }}" + "CONFIG_whisk_spi_ActivationStoreProvider": "org.apache.openwhisk.core.database.elasticsearch.ElasticSearchActivationStoreProvider" + when: db.activation_store.backend == "ElasticSearch" + +- name: merge elasticsearch activation store env + set_fact: + env: "{{ env | combine(elastic_env) }}" + when: db.activation_store.backend == "ElasticSearch" + +- name: check if coverage collection is enabled + set_fact: + coverage_enabled: false + when: coverage_enabled is undefined + +- name: ensure scheduler coverage directory is created with permissions + file: + path: "{{ coverage_logs_dir }}/scheduler/{{ item }}" + state: directory + mode: 0777 + with_items: + - scheduler + - common + become: "{{ logs.dir.become }}" + when: coverage_enabled + +- name: extend scheduler volume for coverage + set_fact: + scheduler_volumes: "{{ scheduler_volumes|default({}) + [coverage_logs_dir+'/scheduler:/coverage'] }}" + when: coverage_enabled + +- name: include plugins + include_tasks: "{{ item }}.yml" + with_items: "{{ scheduler_plugins | default([]) }}" + +- name: Judge current scheduler whether deployed + shell: echo $(docker ps | grep {{ scheduler_name }} | wc -l) + register: schedulerDeployed + when: zeroDowntimeDeployment.enabled == true + +- name: disable scheduler{{ groups['schedulers'].index(inventory_hostname) }} before redeploy scheduler + uri: + url: "{{ scheduler.protocol }}://{{ ansible_host }}:{{ scheduler_port }}/disable" + validate_certs: no + method: POST + status_code: 200 + user: "{{ scheduler.username }}" + password: "{{ scheduler.password }}" + force_basic_auth: yes + ignore_errors: "{{ scheduler.deployment_ignore_error }}" + when: zeroDowntimeDeployment.enabled == true and schedulerDeployed.stdout != "0" + +- name: wait until all queue and create queue task is finished before redeploy scheduler when using apicall solution or half solution + uri: + url: "{{ scheduler.protocol }}://{{ ansible_host }}:{{ scheduler_port }}/queue/total" + validate_certs: no + return_content: yes + user: "{{ scheduler.username }}" + password: "{{ scheduler.password }}" + force_basic_auth: yes + register: totalQueue + until: totalQueue.content == "0" + retries: 180 + delay: 5 + when: zeroDowntimeDeployment.enabled == true and schedulerDeployed.stdout != "0" + ignore_errors: "{{ scheduler.deployment_ignore_error }}" + +- name: wait until all queue and create queue task is finished before redeploy scheduler using sleep solution + shell: sleep 120s + when: zeroDowntimeDeployment.enabled == true and schedulerDeployed.stdout != "0" and zerodowntimeDeployment.solution == 'sleep' + +- name: (re)start scheduler + docker_container: + name: "{{ scheduler_name }}" + image: + "{{docker_registry~docker.image.prefix}}/scheduler:{{ 'cov' if (coverage_enabled) else docker.image.tag }}" + state: started + recreate: true + restart_policy: "{{ docker.restart.policy }}" + hostname: "{{ scheduler_name }}" + env: "{{ env }}" + volumes: "{{ scheduler_volumes }}" + ports: "{{ ports_to_expose }}" + command: + /bin/sh -c + "exec /init.sh {{ scheduler_index }} + >> /logs/{{ scheduler_name }}_logs.log 2>&1" + +- name: wait until the Scheduler in this host is up and running + uri: + url: + "{{scheduler.protocol}}://{{ansible_host}}:{{scheduler_port}}/ping" + validate_certs: "no" + register: result + until: result.status == 200 + retries: 12 + delay: 5 + +- name: create scheduler jmx.yml + template: + src: "{{ openwhisk_home }}/ansible/roles/schedulers/templates/jmx.yml.j2" + dest: "{{ scheduler.confdir }}/jmx.yml" + ignore_errors: True + when: scheduler_index | int + 1 == groups['schedulers'] | length or ansible_host != hostvars[groups['schedulers'][scheduler_index | int + 1 ]]['ansible_host'] diff --git a/ansible/roles/schedulers/tasks/join_akka_cluster.yml b/ansible/roles/schedulers/tasks/join_akka_cluster.yml new file mode 100644 index 00000000000..375a549521b --- /dev/null +++ b/ansible/roles/schedulers/tasks/join_akka_cluster.yml @@ -0,0 +1,38 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more contributor +# license agreements; and to You under the Apache License, Version 2.0. +--- +# +# Scheduler 'plugin' that will add the items necessary to the scheduler +# environment to cause the scheduler to join a specified akka cluster +# + +- name: add akka port to ports_to_expose + set_fact: + ports_to_expose: >- + {{ ports_to_expose }} + + [ "{{ (scheduler.akka.cluster.basePort + (scheduler_index | int)) }}:" + + "{{ scheduler.akka.cluster.bindPort }}" ] + +- name: add seed nodes to scheduler environment + set_fact: + env: >- + {{ env | combine({ + 'CONFIG_akka_cluster_seedNodes_' ~ seedNode.0: + 'akka://scheduler-actor-system@'~seedNode.1~':'~(scheduler.akka.cluster.basePort+seedNode.0) + }) }} + with_indexed_items: "{{ scheduler.akka.cluster.seedNodes }}" + loop_control: + loop_var: seedNode + +- name: Add akka environment to scheduler environment + vars: + akka_env: + "CONFIG_akka_actor_provider": "{{ scheduler.akka.provider }}" + "CONFIG_akka_remote_artery_canonical_hostname": + "{{ scheduler.akka.cluster.host[(scheduler_index | int)] }}" + "CONFIG_akka_remote_artery_canonical_port": + "{{ scheduler.akka.cluster.basePort + (scheduler_index | int) }}" + "CONFIG_akka_remote_artery_bind_port": + "{{ scheduler.akka.cluster.bindPort }}" + set_fact: + env: "{{ env | combine(akka_env) }}" diff --git a/ansible/roles/schedulers/tasks/main.yml b/ansible/roles/schedulers/tasks/main.yml new file mode 100644 index 00000000000..26f7f356a29 --- /dev/null +++ b/ansible/roles/schedulers/tasks/main.yml @@ -0,0 +1,10 @@ +--- +# This role will install scheduler in group 'schedulers' in the environment inventory +# In deploy mode it will deploy schedulers. +# In clean mode it will remove the scheduler containers. + +- import_tasks: deploy.yml + when: mode == "deploy" + +- import_tasks: clean.yml + when: mode == "clean" diff --git a/ansible/roles/schedulers/templates/jmx.yml.j2 b/ansible/roles/schedulers/templates/jmx.yml.j2 new file mode 100644 index 00000000000..c66802ee3e1 --- /dev/null +++ b/ansible/roles/schedulers/templates/jmx.yml.j2 @@ -0,0 +1,25 @@ +collects: +{% set index = groups['schedulers'].index(inventory_hostname) %} +{% set ip = hostvars[groups['schedulers'][groups['schedulers'].index(inventory_hostname) | int]]['ansible_host'] %} +{% for i in range(0,index+1)|reverse if hostvars[groups['schedulers'][i]]['ansible_host'] == ip %} + - {{ hostvars[groups['schedulers'][i]]['inventory_hostname'] }} +{% endfor %} + +rules: +{% for i in range(0,index+1)|reverse if hostvars[groups['schedulers'][i]]['ansible_host'] == ip %} + - name: {{ hostvars[groups['schedulers'][i]]['inventory_hostname'] }} + metrics: + - kafka.producer:type=producer-metrics,client-id=* request-latency-avg,request-latency-max,request-rate,response-rate,incoming-byte-rate,outgoing-byte-rate,connection-count,connection-creation-rate,connection-close-rate,io-ratio,io-time-ns-avg,io-wait-ratio,select-rate,io-wait-time-ns-avg client-id + - kafka.producer:type=producer-node-metrics,client-id=*,node-id=* request-rate,response-rate,request-latency-max,request-latency-avg,incoming-byte-rate,request-size-avg,outgoing-byte-rate,request-size-max client-id + - kafka.producer:type=producer-topic-metrics,client-id=*,topic=* record-retry-rate,record-send-rate,compression-rate,byte-rate,record-error-rate client-id + - kafka.consumer:type=consumer-metrics,client-id=* connection-creation-rate,response-rate,select-rate,connection-count,network-io-rate,io-ratio,io-wait-time-ns-avg,io-wait-ratio,outgoing-byte-rate,request-size-max,io-time-ns-avg,request-rate,incoming-byte-rate,connection-close-rate,request-size-avg client-id + - kafka.consumer:type=consumer-fetch-manager-metrics,client-id=*,topic=* bytes-consumed-rate,records-consumed-rate,fetch-size-max,fetch-size-avg,records-per-request-avg client-id + - kafka.consumer:type=consumer-node-metrics,client-id=*,node-id=* request-rate,response-rate,request-latency-max,request-latency-avg,incoming-byte-rate,request-size-avg,outgoing-byte-rate,request-size-max client-id + - kafka.consumer:type=consumer-coordinator-metrics,client-id=* join-time-max,commit-latency-avg,sync-time-avg,join-rate,assigned-partitions,sync-rate,commit-rate,last-heartbeat-seconds-ago,heartbeat-rate,commit-latency-max,join-time-avg,sync-time-max,heartbeat-response-time-max client-id + jvmPrefix: kafka.jvm + jmxUrl: "service:jmx:rmi:///jndi/rmi://{{ ip }}:{{ jmx.basePortScheduler + i }}/jmxrmi" + jmxUsername: "{{ jmx.user }}" + jmxPassword: "{{ jmx.pass }}" + +{% endfor %} +intervalSec: 10 diff --git a/ansible/scheduler.yml b/ansible/scheduler.yml new file mode 100644 index 00000000000..cb88c4f66f7 --- /dev/null +++ b/ansible/scheduler.yml @@ -0,0 +1,37 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more contributor +# license agreements; and to You under the Apache License, Version 2.0. +--- +# This playbook deploys Openwhisk Controllers. + +- hosts: schedulers + vars: + # + # host_group - usually "{{ groups['...'] }}" where '...' is what was used + # for 'hosts' above. The hostname of each host will be looked up in this + # group to assign a zero-based index. That index will be used in concert + # with 'name_prefix' below to assign a host/container name. + host_group: "{{ groups['schedulers'] }}" + # + # name_prefix - a unique prefix for this set of controllers. The prefix + # will be used in combination with an index (determined using + # 'host_group' above) to name host/controllers. + name_prefix: "scheduler" + # + # controller_index_base - the deployment process allocates host docker + # ports to individual controllers based on their indices. This is an + # additional offset to prevent collisions between different controller + # groups. Usually 0 if only one group is being deployed, otherwise + # something like "{{ groups['firstcontrollergroup']|length }}" + scheduler_index_base: 0 + # + # select which additional capabilities (from the controller role) need + # to be added to the controller. Plugin will override default + # configuration settings. (Plugins are found in the + # 'roles/controller/tasks' directory for now.) + scheduler_plugins: + # Join an akka cluster rather than running standalone akka + - "join_akka_cluster" + + serial: '1' + roles: + - schedulers diff --git a/ansible/tasks/initdb.yml b/ansible/tasks/initdb.yml index 6f57019fcf5..6071fe36d28 100644 --- a/ansible/tasks/initdb.yml +++ b/ansible/tasks/initdb.yml @@ -29,6 +29,7 @@ readers: - "{{ db.credentials.controller.user }}" - "{{ db.credentials.invoker.user }}" + - "{{ db.credentials.scheduler.user }}" - include_tasks: db/recreateDoc.yml vars: diff --git a/ansible/tasks/wipeDatabase.yml b/ansible/tasks/wipeDatabase.yml index da4c9e92431..a4688948ec4 100644 --- a/ansible/tasks/wipeDatabase.yml +++ b/ansible/tasks/wipeDatabase.yml @@ -30,6 +30,7 @@ readers: - "{{ db.credentials.controller.user }}" - "{{ db.credentials.invoker.user }}" + - "{{ db.credentials.scheduler.user }}" writers: - "{{ db.credentials.controller.user }}" @@ -46,6 +47,7 @@ writers: - "{{ db.credentials.controller.user }}" - "{{ db.credentials.invoker.user }}" + - "{{ db.credentials.scheduler.user }}" - include_tasks: recreateViews.yml when: withViews == True diff --git a/ansible/templates/db_local.ini.j2 b/ansible/templates/db_local.ini.j2 index c94ab631b0e..857f58a099c 100644 --- a/ansible/templates/db_local.ini.j2 +++ b/ansible/templates/db_local.ini.j2 @@ -13,3 +13,7 @@ db_password={{ lookup('env', 'OW_DB_CONTROLLER_PASSWORD') | default('some_contro [invoker] db_username={{ lookup('env', 'OW_DB_INVOKER_USERNAME') | default(db_prefix + 'invoker0', true) }} db_password={{ lookup('env', 'OW_DB_INVOKER_PASSWORD') | default('some_invoker_passw0rd', true) }} + +[scheduler] +db_username={{ lookup('env', 'OW_DB_SCHEDULER_USERNAME') | default(db_prefix + 'scheduler0', true) }} +db_password={{ lookup('env', 'OW_DB_SCHEDULER_PASSWORD') | default('some_scheduler_passw0rd', true) }} diff --git a/common/scala/build.gradle b/common/scala/build.gradle index 988a23655b8..ccdfc8a567a 100644 --- a/common/scala/build.gradle +++ b/common/scala/build.gradle @@ -40,6 +40,10 @@ dependencies { compile "com.typesafe.akka:akka-actor_${gradle.scala.depVersion}:${gradle.akka.version}" compile "com.typesafe.akka:akka-stream_${gradle.scala.depVersion}:${gradle.akka.version}" compile "com.typesafe.akka:akka-slf4j_${gradle.scala.depVersion}:${gradle.akka.version}" + compile "com.typesafe.akka:akka-cluster_${gradle.scala.depVersion}:${gradle.akka.version}" + compile "com.typesafe.akka:akka-cluster-metrics_${gradle.scala.depVersion}:${gradle.akka.version}" + compile "com.typesafe.akka:akka-cluster-tools_${gradle.scala.depVersion}:${gradle.akka.version}" + compile "com.typesafe.akka:akka-distributed-data_${gradle.scala.depVersion}:${gradle.akka.version}" compile "com.typesafe.akka:akka-http-core_${gradle.scala.depVersion}:${gradle.akka_http.version}" compile "com.typesafe.akka:akka-http-spray-json_${gradle.scala.depVersion}:${gradle.akka_http.version}" diff --git a/common/scala/src/main/resources/application.conf b/common/scala/src/main/resources/application.conf index ff5f8e2af91..ba2cf643b53 100644 --- a/common/scala/src/main/resources/application.conf +++ b/common/scala/src/main/resources/application.conf @@ -36,6 +36,11 @@ akka.http { max-connections = 128 max-open-requests = 1024 } + + server { + preview.enable-http2 = on + parsing.illegal-header-warnings = off + } } #kamon related configuration @@ -180,6 +185,13 @@ whisk { # max-message-bytes is defined programatically as ${whisk.activation.kafka.payload.max} + # ${whisk.activation.kafka.serdes-overhead}. } + creationAck { + segment-bytes = 536870912 + retention-bytes = 1073741824 + retention-ms = 3600000 + # max-message-bytes is defined programatically as ${whisk.activation.kafka.payload.max} + + # ${whisk.activation.kafka.serdes-overhead}. + } health { segment-bytes = 536870912 retention-bytes = 1073741824 @@ -197,6 +209,11 @@ whisk { retention-bytes = 1073741824 retention-ms = 3600000 } + scheduler { + segment-bytes = 536870912 + retention-bytes = 1073741824 + retention-ms = 86400000 + } prefix = "" user-event { prefix = "" diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala index e50fbb4c473..49b97e5ec88 100644 --- a/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala +++ b/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala @@ -303,5 +303,5 @@ object ConfigKeys { val whiskClusterName = "whisk.cluster.name" - val dataManagementServiceRetryInterval = "whisk.scheduler.data-management-service.retryInterval" + val dataManagementServiceRetryInterval = "whisk.scheduler.data-management-service.retry-interval" } diff --git a/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/FPCPoolBalancer.scala b/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/FPCPoolBalancer.scala index 37c9ffb4a04..f606f64f8a0 100644 --- a/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/FPCPoolBalancer.scala +++ b/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/FPCPoolBalancer.scala @@ -46,7 +46,8 @@ class FPCPoolBalancer(config: WhiskConfig, extends LoadBalancer { private implicit val executionContext: ExecutionContext = actorSystem.dispatcher - private implicit val requestTimeout: Timeout = Timeout(5.seconds) + // This value is given according to the total waiting time at QueueManager for a new queue to be created. + private implicit val requestTimeout: Timeout = Timeout(8.seconds) private val entityStore = WhiskEntityStore.datastore() @@ -366,7 +367,8 @@ class FPCPoolBalancer(config: WhiskConfig, // and complete the promise with a failure if necessary activationPromises .remove(aid) - .foreach(_.tryFailure(new Throwable("Activation entry has timed out, no completion or active ack received yet"))) + .foreach( + _.tryFailure(new Throwable("Activation entry has timed out, no completion or active ack received yet"))) } // Active acks that are received here are strictly from user actions - health actions are not part of diff --git a/core/scheduler/src/main/resources/application.conf b/core/scheduler/src/main/resources/application.conf index f1a2ed6792f..66c07fc672e 100644 --- a/core/scheduler/src/main/resources/application.conf +++ b/core/scheduler/src/main/resources/application.conf @@ -18,6 +18,7 @@ akka { actor { + provider = cluster allow-java-serialization = off serializers { kryo = "io.altoo.akka.serialization.kryo.KryoSerializer" @@ -37,10 +38,11 @@ akka { } } - remote.netty.tcp { - send-buffer-size = 3151796b - receive-buffer-size = 3151796b - maximum-frame-size = 3151796b + remote { + artery { + enabled = on + transport = tcp + } } } diff --git a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/Scheduler.scala b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/Scheduler.scala index b6eaed5cb84..f93a766b10d 100644 --- a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/Scheduler.scala +++ b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/Scheduler.scala @@ -19,14 +19,14 @@ package org.apache.openwhisk.core.scheduler import akka.Done import akka.actor.{ActorRef, ActorRefFactory, ActorSelection, ActorSystem, CoordinatedShutdown, Props} +import akka.http.scaladsl.Http import akka.http.scaladsl.model.{HttpRequest, HttpResponse} -import akka.util.Timeout import akka.pattern.ask +import akka.util.Timeout import com.typesafe.config.ConfigValueFactory import kamon.Kamon import org.apache.openwhisk.common.Https.HttpsConfig import org.apache.openwhisk.common._ -import org.apache.openwhisk.core.{ConfigKeys, WhiskConfig} import org.apache.openwhisk.core.WhiskConfig.{servicePort, _} import org.apache.openwhisk.core.ack.{MessagingActiveAck, UserEventSender} import org.apache.openwhisk.core.connector._ @@ -37,28 +37,22 @@ import org.apache.openwhisk.core.etcd.EtcdType.ByteStringToString import org.apache.openwhisk.core.etcd.{EtcdClient, EtcdConfig} import org.apache.openwhisk.core.scheduler.container.{ContainerManager, CreationJobManager} import org.apache.openwhisk.core.scheduler.grpc.ActivationServiceImpl -import org.apache.openwhisk.core.scheduler.queue.{ - DurationCheckerProvider, - MemoryQueue, - QueueManager, - QueueSize, - SchedulingDecisionMaker -} +import org.apache.openwhisk.core.scheduler.queue._ import org.apache.openwhisk.core.service.{DataManagementService, EtcdWorker, LeaseKeepAliveService, WatcherService} +import org.apache.openwhisk.core.{ConfigKeys, WhiskConfig} import org.apache.openwhisk.grpc.ActivationServiceHandler import org.apache.openwhisk.http.BasicHttpService import org.apache.openwhisk.spi.SpiLoader import org.apache.openwhisk.utils.ExecutionContextFactory +import pureconfig.generic.auto._ import pureconfig.loadConfigOrThrow import spray.json.{DefaultJsonProtocol, _} +import scala.collection.JavaConverters import scala.concurrent.duration._ import scala.concurrent.{Await, Future} import scala.language.postfixOps import scala.util.{Failure, Success, Try} -import pureconfig.generic.auto._ - -import scala.collection.JavaConverters class Scheduler(schedulerId: SchedulerInstanceId, schedulerEndpoints: SchedulerEndpoints)(implicit config: WhiskConfig, actorSystem: ActorSystem, @@ -274,12 +268,9 @@ object Scheduler { schedulerHost -> null, schedulerAkkaPort -> null, schedulerRpcPort -> null, - WhiskConfig.actionInvokePerMinuteLimit -> null, - WhiskConfig.actionInvokeConcurrentLimit -> null, - WhiskConfig.triggerFirePerMinuteLimit -> null) ++ + WhiskConfig.actionInvokeConcurrentLimit -> null) ++ kafkaHosts ++ zookeeperHosts ++ - wskApiHost ++ ExecManifest.requiredProperties def initKamon(instance: SchedulerInstanceId): Unit = { @@ -329,7 +320,7 @@ object Scheduler { val msgProvider = SpiLoader.get[MessagingProvider] Seq( - (topicPrefix + "scheduler" + instanceId.asString, "actions", Some(ActivationEntityLimit.MAX_ACTIVATION_LIMIT)), + (topicPrefix + "scheduler" + instanceId.asString, "scheduler", Some(ActivationEntityLimit.MAX_ACTIVATION_LIMIT)), ( topicPrefix + "creationAck" + instanceId.asString, "creationAck", @@ -347,12 +338,17 @@ object Scheduler { // Create scheduler val scheduler = new Scheduler(instanceId, schedulerEndpoints) - // TODO: Add Akka-grpc handler - val httpsConfig = - if (Scheduler.protocol == "https") Some(loadConfigOrThrow[HttpsConfig]("whisk.controller.https")) else None - - BasicHttpService.startHttpService(FPCSchedulerServer.instance(scheduler).route, port, httpsConfig)(actorSystem) + Http() + .newServerAt("0.0.0.0", port = rpcPort) + .bind(scheduler.serviceHandlers) + .foreach { _ => + val httpsConfig = + if (Scheduler.protocol == "https") Some(loadConfigOrThrow[HttpsConfig]("whisk.controller.https")) + else None + BasicHttpService.startHttpService(FPCSchedulerServer.instance(scheduler).route, port, httpsConfig)( + actorSystem) + } case Failure(t) => abort(s"Invalid runtimes manifest: $t") } @@ -384,7 +380,7 @@ case class SchedulerStates(sid: SchedulerInstanceId, queueSize: Int, endpoints: def getRemoteRef(name: String)(implicit context: ActorRefFactory): ActorSelection = { implicit val ec = context.dispatcher - val path = s"akka//scheduler-actor-system@${endpoints.asAkkaEndpoint}/user/${name}" + val path = s"akka://scheduler-actor-system@${endpoints.asAkkaEndpoint}/user/${name}" context.actorSelection(path) } diff --git a/tests/src/test/resources/application.conf.j2 b/tests/src/test/resources/application.conf.j2 index 352fa1677b8..4f101448181 100644 --- a/tests/src/test/resources/application.conf.j2 +++ b/tests/src/test/resources/application.conf.j2 @@ -15,6 +15,9 @@ akka.http.host-connection-pool.client.idle-timeout = 90 s # Avoid system exit for test runs akka.jvm-exit-on-fatal-error = off +# Each ActorSystem binds to a free port +akka.remote.artery.canonical.port=0 + whisk { feature-flags { require-api-key-annotation = {{ whisk.feature_flags.require_api_key_annotation | default(true) }} @@ -143,11 +146,22 @@ whisk { grpc { tls = "{{ scheduler.grpc.tls | default('false') | lower }}" } + queue { + idle-grace = "{{ scheduler.queue.idleGrace | default('20 seconds') }}" + stop-grace = "{{ scheduler.queue.stopGrace | default('20 seconds') }}" + flush-grace = "{{ scheduler_queue_flushGrace | default('60 seconds') }}" + graceful-shutdown-timeout = "{{ scheduler.queue.gracefulShutdownTimeout | default('5 seconds') }}" + max-retention-size = "{{ scheduler.queue.maxRetentionSize | default(10000) }}" + max-retention-ms = "{{ scheduler.queue.maxRetentionMs | default(60000) }}" + throttling-fraction = "{{ scheduler.queue.throttlingFraction | default(0.9) }}" + duration-buffer-size = "{{ scheduler.queue.durationBufferSize | default(10) }}" + } queue-manager { - max-scheduling-time = "{{ scheduler.queueManager.maxSchedulingTime }}" - max-retries-to-get-queue = "{{ scheduler.queueManager.maxRetriesToGetQueue }}" + max-scheduling-time = "{{ scheduler.queueManager.maxSchedulingTime }}" + max-retries-to-get-queue = "{{ scheduler.queueManager.maxRetriesToGetQueue }}" } max-peek = "{{ scheduler.maxPeek }}" + in-progress-job-retention = "{{ scheduler.inProgressJobRetentionSecond | default('20 seconds') }}" } }