Skip to content

Commit 1d27143

Browse files
authored
Merge branch 'master' into add-missing-config
2 parents a7b3039 + cbdcfe5 commit 1d27143

File tree

32 files changed

+237
-166
lines changed

32 files changed

+237
-166
lines changed

ansible/README.md

Lines changed: 3 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,8 @@ It will run one more component called "scheduler" and ETCD.
157157
You can update service providers for the scheduler as follows.
158158

159159
**common/scala/src/main/resources/reference.conf**
160+
161+
If you are using ElasticSearch (recommended) then replace ```NoopDurationCheckerProvider``` with ```ElasticSearchDurationCheckerProvider``` below.
160162
```
161163
whisk.spi {
162164
ArtifactStoreProvider = org.apache.openwhisk.core.database.CouchDbStoreProvider
@@ -169,27 +171,13 @@ whisk.spi {
169171
AuthenticationDirectiveProvider = org.apache.openwhisk.core.controller.BasicAuthenticationDirective
170172
InvokerProvider = org.apache.openwhisk.core.invoker.FPCInvokerReactive
171173
InvokerServerProvider = org.apache.openwhisk.core.invoker.FPCInvokerServer
172-
DurationCheckerProvider = org.apache.openwhisk.core.scheduler.queue.ElasticSearchDurationCheckerProvider
174+
DurationCheckerProvider = org.apache.openwhisk.core.scheduler.queue.NoopDurationCheckerProvider
173175
}
174176
.
175177
.
176178
.
177179
```
178180

179-
#### Configure akka dispatcher for the scheduler
180-
Add a new dispatcher entry as follows.
181-
182-
**common/scala/src/main/resources/reference.conf**
183-
```
184-
lease-service-dispatcher {
185-
executor = "thread-pool-executor"
186-
type = PinnedDispatcher
187-
}
188-
.
189-
.
190-
.
191-
```
192-
193181
#### Enable the scheduler
194182
- Make sure you enable the scheduler by configuring `scheduler_enable`.
195183

ansible/group_vars/all

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -510,7 +510,7 @@ scheduler:
510510
extraEnv: "{{ scheduler_extraEnv | default({}) }}"
511511
dataManagementService:
512512
retryInterval: "{{ scheduler_dataManagementService_retryInterval | default('1 second') }}"
513-
inProgressJobRetentionSecond: "{{ scheduler_inProgressJobRetentionSecond | default('20 seconds') }}"
513+
inProgressJobRetention: "{{ scheduler_inProgressJobRetention | default('20 seconds') }}"
514514
managedFraction: "{{ scheduler_managed_fraction | default(1.0 - (scheduler_blackbox_fraction | default(__scheduler_blackbox_fraction))) }}"
515515
blackboxFraction: "{{ scheduler_blackbox_fraction | default(__scheduler_blackbox_fraction) }}"
516516
queueManager:

ansible/roles/cli/tasks/deploy.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@
3535
get_url:
3636
url: "{{ openwhisk_cli.remote.location }}/{{ openwhisk_cli.archive_name}}-{{ openwhisk_cli_tag }}-all.tgz"
3737
dest: "{{ nginx.confdir }}/cli_temp/{{ openwhisk_cli.archive_name }}.tgz"
38-
headers: "{{ openwhisk_cli.remote.headers | default('') }}"
38+
headers: "{{ openwhisk_cli.remote.headers | default('{}') }}"
3939
when: openwhisk_cli.installation_mode == "remote"
4040

4141
- name: "... or Copy release archive to build directory"

ansible/roles/schedulers/tasks/deploy.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@
112112
"CONFIG_whisk_scheduler_protocol": "{{ scheduler.protocol }}"
113113
"CONFIG_whisk_scheduler_maxPeek": "{{ scheduler.maxPeek }}"
114114
"CONFIG_whisk_scheduler_dataManagementService_retryInterval": "{{ scheduler.dataManagementService.retryInterval }}"
115-
"CONFIG_whisk_scheduler_inProgressJobRetention": "{{ scheduler.inProgressJobRetentionSecond }}"
115+
"CONFIG_whisk_scheduler_inProgressJobRetention": "{{ scheduler.inProgressJobRetention }}"
116116
"CONFIG_whisk_scheduler_queueManager_maxSchedulingTime": "{{ scheduler.queueManager.maxSchedulingTime }}"
117117
"CONFIG_whisk_scheduler_queueManager_maxRetriesToGetQueue": "{{ scheduler.queueManager.maxRetriesToGetQueue }}"
118118
"CONFIG_whisk_scheduler_queue_idleGrace": "{{ scheduler.queue.idleGrace }}"

common/scala/src/main/scala/org/apache/openwhisk/connector/kafka/KafkaProducerConnector.scala

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ import pureconfig.generic.auto._
2727
import org.apache.openwhisk.common.{Counter, Logging, TransactionId}
2828
import org.apache.openwhisk.connector.kafka.KafkaConfiguration._
2929
import org.apache.openwhisk.core.ConfigKeys
30-
import org.apache.openwhisk.core.connector.{Message, MessageProducer}
30+
import org.apache.openwhisk.core.connector.{Message, MessageProducer, ResultMetadata}
3131
import org.apache.openwhisk.core.entity.{ByteSize, UUIDs}
3232
import org.apache.openwhisk.utils.Exceptions
3333

@@ -49,17 +49,18 @@ class KafkaProducerConnector(
4949
override def sentCount(): Long = sentCounter.cur
5050

5151
/** Sends msg to topic. This is an asynchronous operation. */
52-
override def send(topic: String, msg: Message, retry: Int = 3): Future[RecordMetadata] = {
52+
override def send(topic: String, msg: Message, retry: Int = 3): Future[ResultMetadata] = {
5353
implicit val transid: TransactionId = msg.transid
5454
val record = new ProducerRecord[String, String](topic, "messages", msg.serialize)
55-
val produced = Promise[RecordMetadata]()
55+
val produced = Promise[ResultMetadata]()
5656

5757
Future {
5858
blocking {
5959
try {
6060
producer.send(record, new Callback {
6161
override def onCompletion(metadata: RecordMetadata, exception: Exception): Unit = {
62-
if (exception == null) produced.trySuccess(metadata)
62+
if (exception == null)
63+
produced.trySuccess(ResultMetadata(metadata.topic(), metadata.partition(), metadata.offset()))
6364
else produced.tryFailure(exception)
6465
}
6566
})
@@ -72,7 +73,7 @@ class KafkaProducerConnector(
7273

7374
produced.future.andThen {
7475
case Success(status) =>
75-
logging.debug(this, s"sent message: ${status.topic()}[${status.partition()}][${status.offset()}]")
76+
logging.debug(this, s"sent message: ${status.topic}[${status.partition}][${status.offset}]")
7677
sentCounter.next()
7778
case Failure(t) =>
7879
logging.error(this, s"sending message on topic '$topic' failed: ${t.getMessage}")

common/scala/src/main/scala/org/apache/openwhisk/connector/lean/LeanProducer.scala

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,11 @@
1818
package org.apache.openwhisk.connector.lean
1919

2020
import akka.actor.ActorSystem
21+
2122
import scala.concurrent.Future
22-
import org.apache.kafka.clients.producer.RecordMetadata
23-
import org.apache.kafka.common.TopicPartition
2423
import org.apache.openwhisk.common.Counter
2524
import org.apache.openwhisk.common.Logging
26-
import org.apache.openwhisk.core.connector.Message
27-
import org.apache.openwhisk.core.connector.MessageProducer
25+
import org.apache.openwhisk.core.connector.{Message, MessageProducer, ResultMetadata}
2826

2927
import java.util.concurrent.{BlockingQueue, LinkedBlockingQueue}
3028
import scala.collection.mutable.Map
@@ -39,15 +37,15 @@ class LeanProducer(queues: Map[String, BlockingQueue[Array[Byte]]])(implicit log
3937
override def sentCount(): Long = sentCounter.cur
4038

4139
/** Sends msg to topic. This is an asynchronous operation. */
42-
override def send(topic: String, msg: Message, retry: Int = 3): Future[RecordMetadata] = {
40+
override def send(topic: String, msg: Message, retry: Int = 3): Future[ResultMetadata] = {
4341
implicit val transid = msg.transid
4442

4543
val queue = queues.getOrElseUpdate(topic, new LinkedBlockingQueue[Array[Byte]]())
4644

4745
Future {
4846
queue.put(msg.serialize.getBytes(StandardCharsets.UTF_8))
4947
sentCounter.next()
50-
new RecordMetadata(new TopicPartition(topic, 0), -1, -1, System.currentTimeMillis(), null, -1, -1)
48+
ResultMetadata(topic, 0, -1)
5149
}
5250
}
5351

common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -302,7 +302,7 @@ object ConfigKeys {
302302
val schedulerMaxPeek = "whisk.scheduler.max-peek"
303303
val schedulerQueue = "whisk.scheduler.queue"
304304
val schedulerQueueManager = "whisk.scheduler.queue-manager"
305-
val schedulerInProgressJobRetentionSecond = "whisk.scheduler.in-progress-job-retention"
305+
val schedulerInProgressJobRetention = "whisk.scheduler.in-progress-job-retention"
306306

307307
val whiskClusterName = "whisk.cluster.name"
308308

common/scala/src/main/scala/org/apache/openwhisk/core/connector/Message.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,8 @@ trait Message {
4848
override def toString = serialize
4949
}
5050

51+
case class ResultMetadata(topic: String, partition: Int, offset: Long)
52+
5153
case class ActivationMessage(override val transid: TransactionId,
5254
action: FullyQualifiedEntityName,
5355
revision: DocRevision,

common/scala/src/main/scala/org/apache/openwhisk/core/connector/MessageProducer.scala

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,15 +19,13 @@ package org.apache.openwhisk.core.connector
1919

2020
import scala.concurrent.Future
2121

22-
import org.apache.kafka.clients.producer.RecordMetadata
23-
2422
trait MessageProducer {
2523

2624
/** Count of messages sent. */
2725
def sentCount(): Long
2826

2927
/** Sends msg to topic. This is an asynchronous operation. */
30-
def send(topic: String, msg: Message, retry: Int = 0): Future[RecordMetadata]
28+
def send(topic: String, msg: Message, retry: Int = 0): Future[ResultMetadata]
3129

3230
/** Closes producer. */
3331
def close(): Unit

core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/CommonLoadBalancer.scala

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ import java.util.concurrent.atomic.LongAdder
2323

2424
import akka.actor.ActorSystem
2525
import akka.event.Logging.InfoLevel
26-
import org.apache.kafka.clients.producer.RecordMetadata
2726
import pureconfig._
2827
import pureconfig.generic.auto._
2928
import org.apache.openwhisk.common.LoggingMarkers._
@@ -189,7 +188,7 @@ abstract class CommonLoadBalancer(config: WhiskConfig,
189188
/** 3. Send the activation to the invoker */
190189
protected def sendActivationToInvoker(producer: MessageProducer,
191190
msg: ActivationMessage,
192-
invoker: InvokerInstanceId): Future[RecordMetadata] = {
191+
invoker: InvokerInstanceId): Future[ResultMetadata] = {
193192
implicit val transid: TransactionId = msg.transid
194193

195194
val topic = s"${Controller.topicPrefix}invoker${invoker.toInt}"
@@ -206,7 +205,7 @@ abstract class CommonLoadBalancer(config: WhiskConfig,
206205
transid.finished(
207206
this,
208207
start,
209-
s"posted to ${status.topic()}[${status.partition()}][${status.offset()}]",
208+
s"posted to ${status.topic}[${status.partition}][${status.offset}]",
210209
logLevel = InfoLevel)
211210
case Failure(_) => transid.failed(this, start, s"error on posting to topic $topic")
212211
}

0 commit comments

Comments
 (0)