Skip to content

Commit 2078e47

Browse files
committed
Add retry to store activations.
1 parent 639c4a9 commit 2078e47

File tree

5 files changed

+141
-31
lines changed

5 files changed

+141
-31
lines changed

common/scala/src/main/resources/application.conf

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -348,7 +348,7 @@ whisk {
348348

349349
# ActivationStore related configuration
350350
# For example:
351-
# activationStore {
351+
# activation-store {
352352
# elasticsearch {
353353
# protocol = # "http" or "https"
354354
# hosts = # the hosts address of ES, can be multi hosts combined with commas, like "172.17.0.1:9200,172.17.0.2:9200,172.17.0.3:9200"
@@ -361,6 +361,15 @@ whisk {
361361
# }
362362
# }
363363

364+
activation-store {
365+
retry-config {
366+
max-tries = 3
367+
}
368+
elasticsearch {
369+
keep-alive = 13 minutes
370+
}
371+
}
372+
364373
azure-blob {
365374
# Config property when using AzureBlobAttachmentStore
366375
# whisk {

common/scala/src/main/scala/org/apache/openwhisk/core/database/Batcher.scala

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -49,9 +49,10 @@ import akka.stream.scaladsl.{Sink, Source}
4949
* @tparam T the type to be batched
5050
* @tparam R return type of a single element after operation
5151
*/
52-
class Batcher[T, R](batchSize: Int, concurrency: Int)(operation: Seq[T] => Future[Seq[R]])(implicit
53-
system: ActorSystem,
54-
ec: ExecutionContext) {
52+
class Batcher[T, R](batchSize: Int, concurrency: Int, retry: Int)(operation: (Seq[T], Int) => Future[Seq[R]])(
53+
implicit
54+
system: ActorSystem,
55+
ec: ExecutionContext) {
5556

5657
val cm: PartialFunction[Any, CompletionStrategy] = {
5758
case Done =>
@@ -69,7 +70,7 @@ class Batcher[T, R](batchSize: Int, concurrency: Int)(operation: Seq[T] => Futur
6970
val elements = els.map(_._1)
7071
val promises = els.map(_._2)
7172

72-
val f = operation(elements)
73+
val f = operation(elements, retry)
7374
f.onComplete {
7475
case Success(results) => results.zip(promises).foreach { case (result, p) => p.success(result) }
7576
case Failure(e) => promises.foreach(_.failure(e))

common/scala/src/main/scala/org/apache/openwhisk/core/database/CouchDbRestStore.scala

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import akka.event.Logging.ErrorLevel
2222
import akka.http.scaladsl.model._
2323
import akka.stream.scaladsl._
2424
import akka.util.ByteString
25+
2526
import scala.concurrent.Await
2627
import scala.concurrent.duration._
2728
import spray.json._
@@ -30,6 +31,7 @@ import org.apache.openwhisk.core.database.StoreUtils._
3031
import org.apache.openwhisk.core.entity.Attachments.Attached
3132
import org.apache.openwhisk.core.entity.{BulkEntityResult, DocInfo, DocumentReader, UUID}
3233
import org.apache.openwhisk.http.Messages
34+
import pureconfig.loadConfigOrThrow
3335

3436
import scala.concurrent.Future
3537
import scala.util.Try
@@ -74,8 +76,9 @@ class CouchDbRestStore[DocumentAbstraction <: DocumentSerializer](dbProtocol: St
7476
// and more documents need to be stored, then all arriving documents will be put into batches (if enabled) to avoid a long queue.
7577
private val maxOpenDbRequests = system.settings.config.getInt("akka.http.host-connection-pool.max-connections") / 2
7678

79+
private val maxRetry = loadConfigOrThrow[Int]("whisk.activation-store.retry-config.max-tries")
7780
private val batcher: Batcher[JsObject, Either[ArtifactStoreException, DocInfo]] =
78-
new Batcher(500, maxOpenDbRequests)(put(_)(TransactionId.dbBatcher))
81+
new Batcher(500, maxOpenDbRequests, maxRetry)(put(_, _)(TransactionId.dbBatcher))
7982

8083
override protected[database] def put(d: DocumentAbstraction)(implicit transid: TransactionId): Future[DocInfo] = {
8184
val asJson = d.toDocumentRecord
@@ -137,7 +140,7 @@ class CouchDbRestStore[DocumentAbstraction <: DocumentSerializer](dbProtocol: St
137140
transid.failed(this, start, s"[PUT] '$dbName' internal error, failure: '${failure.getMessage}'", ErrorLevel))
138141
}
139142

140-
private def put(ds: Seq[JsObject])(
143+
private def put(ds: Seq[JsObject], retry: Int)(
141144
implicit transid: TransactionId): Future[Seq[Either[ArtifactStoreException, DocInfo]]] = {
142145
val count = ds.size
143146
val start = transid.started(this, LoggingMarkers.DATABASE_BULK_SAVE, s"'$dbName' saving $count documents")
@@ -166,10 +169,16 @@ class CouchDbRestStore[DocumentAbstraction <: DocumentSerializer](dbProtocol: St
166169
}
167170
}
168171

169-
reportFailure(
170-
f,
171-
failure =>
172-
transid.failed(this, start, s"[PUT] '$dbName' internal error, failure: '${failure.getMessage}'", ErrorLevel))
172+
f.recoverWith {
173+
case t: ArtifactStoreException => Future.failed(t)
174+
case _ if retry > 0 =>
175+
transid.failed(this, start, s"failed to store an activation to CouchDB")
176+
put(ds, retry - 1)
177+
case t =>
178+
transid
179+
.failed(this, start, s"[PUT] '$dbName' internal error, failure: '${t.getMessage}'", ErrorLevel)
180+
Future.failed(t)
181+
}
173182
}
174183

175184
override protected[database] def del(doc: DocInfo)(implicit transid: TransactionId): Future[Boolean] = {

common/scala/src/main/scala/org/apache/openwhisk/core/database/elasticsearch/ElasticSearchActivationStore.scala

Lines changed: 37 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -17,35 +17,38 @@
1717

1818
package org.apache.openwhisk.core.database.elasticsearch
1919

20-
import java.time.Instant
21-
import java.util.concurrent.TimeUnit
22-
23-
import scala.language.postfixOps
2420
import akka.actor.ActorSystem
2521
import akka.event.Logging.ErrorLevel
2622
import akka.http.scaladsl.model._
2723
import akka.stream.scaladsl.Flow
24+
import com.google.common.base.Throwables
2825
import com.sksamuel.elastic4s.http.search.SearchHit
2926
import com.sksamuel.elastic4s.http.{ElasticClient, ElasticProperties, NoOpRequestConfigCallback}
3027
import com.sksamuel.elastic4s.indexes.IndexRequest
3128
import com.sksamuel.elastic4s.searches.queries.RangeQuery
3229
import com.sksamuel.elastic4s.searches.queries.matches.MatchPhrase
30+
import org.apache.http
3331
import org.apache.http.auth.{AuthScope, UsernamePasswordCredentials}
32+
import org.apache.http.conn.ConnectionKeepAliveStrategy
3433
import org.apache.http.impl.client.BasicCredentialsProvider
3534
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder
36-
import pureconfig.loadConfigOrThrow
37-
import pureconfig.generic.auto._
38-
import spray.json._
35+
import org.apache.http.protocol.HttpContext
3936
import org.apache.openwhisk.common.{Logging, LoggingMarkers, TransactionId}
4037
import org.apache.openwhisk.core.ConfigKeys
4138
import org.apache.openwhisk.core.containerpool.logging.ElasticSearchJsonProtocol._
42-
import org.apache.openwhisk.core.database._
4339
import org.apache.openwhisk.core.database.StoreUtils._
40+
import org.apache.openwhisk.core.database._
4441
import org.apache.openwhisk.core.entity._
4542
import org.apache.openwhisk.http.Messages
4643
import org.elasticsearch.client.RestClientBuilder.HttpClientConfigCallback
44+
import pureconfig.loadConfigOrThrow
45+
import spray.json._
4746

47+
import java.time.Instant
48+
import java.util.concurrent.TimeUnit
49+
import scala.concurrent.duration.FiniteDuration
4850
import scala.concurrent.{ExecutionContextExecutor, Future, Promise}
51+
import scala.language.postfixOps
4952
import scala.util.Try
5053

5154
case class ElasticSearchActivationStoreConfig(protocol: String,
@@ -60,8 +63,8 @@ class ElasticSearchActivationStore(
6063
useBatching: Boolean = false)(implicit actorSystem: ActorSystem, override val logging: Logging)
6164
extends ActivationStore {
6265

63-
import com.sksamuel.elastic4s.http.ElasticDsl._
6466
import ElasticSearchActivationStore.{generateIndex, httpClientCallback}
67+
import com.sksamuel.elastic4s.http.ElasticDsl._
6568

6669
private implicit val executionContextExecutor: ExecutionContextExecutor = actorSystem.dispatcher
6770

@@ -74,8 +77,9 @@ class ElasticSearchActivationStore(
7477
private val esType = "_doc"
7578
private val maxOpenDbRequests = actorSystem.settings.config
7679
.getInt("akka.http.host-connection-pool.max-connections") / 2
80+
private val maxRetry = loadConfigOrThrow[Int]("whisk.activation-store.retry-config.max-tries")
7781
private val batcher: Batcher[IndexRequest, Either[ArtifactStoreException, DocInfo]] =
78-
new Batcher(500, maxOpenDbRequests)(doStore(_)(TransactionId.dbBatcher))
82+
new Batcher(500, maxOpenDbRequests, maxRetry)(doStore(_, _)(TransactionId.dbBatcher))
7983

8084
private val minStart = 0L
8185
private val maxStart = Instant.now.toEpochMilli + TimeUnit.DAYS.toMillis(365 * 100) //100 years from now
@@ -128,10 +132,10 @@ class ElasticSearchActivationStore(
128132
throw PutException("error on 'put'")
129133
}
130134

131-
reportFailure(res, start, failure => s"[PUT] 'activations' internal error, failure: '${failure.getMessage}'")
135+
res
132136
}
133137

134-
private def doStore(ops: Seq[IndexRequest])(
138+
private def doStore(ops: Seq[IndexRequest], retry: Int)(
135139
implicit transid: TransactionId): Future[Seq[Either[ArtifactStoreException, DocInfo]]] = {
136140
val count = ops.size
137141
val start = transid.started(this, LoggingMarkers.DATABASE_BULK_SAVE, s"'activations' saving $count documents")
@@ -169,7 +173,20 @@ class ElasticSearchActivationStore(
169173
}
170174
}
171175

172-
reportFailure(res, start, failure => s"[PUT] 'activations' internal error, failure: '${failure.getMessage}'")
176+
res.recoverWith {
177+
case t: ArtifactStoreException => Future.failed(t)
178+
case _ if retry > 0 =>
179+
transid.failed(this, start, s"store activation to ElasticSearch failed")
180+
doStore(ops, retry - 1)
181+
case t =>
182+
transid.failed(
183+
this,
184+
start,
185+
s"[PUT] 'activations' internal error, failure: '${t.getMessage}' [${t.getClass.getSimpleName}]\n" + Throwables
186+
.getStackTraceAsString(t),
187+
ErrorLevel)
188+
Future.failed(t)
189+
}
173190
}
174191

175192
override def get(activationId: ActivationId, context: UserContext)(
@@ -426,6 +443,7 @@ object ElasticSearchActivationStore {
426443
AuthScope.ANY,
427444
new UsernamePasswordCredentials(elasticSearchConfig.username, elasticSearchConfig.password))
428445
httpClientBuilder.setDefaultCredentialsProvider(provider)
446+
httpClientBuilder.setKeepAliveStrategy(new CustomKeepAliveStrategy())
429447
}
430448
}
431449

@@ -442,3 +460,9 @@ object ElasticSearchActivationStoreProvider extends ActivationStoreProvider {
442460
actorSystem,
443461
logging)
444462
}
463+
464+
class CustomKeepAliveStrategy extends ConnectionKeepAliveStrategy {
465+
override def getKeepAliveDuration(response: http.HttpResponse, context: HttpContext): Long = {
466+
loadConfigOrThrow[FiniteDuration]("whisk.activation-store.elasticsearch.keep-alive").toMillis
467+
}
468+
}

tests/src/test/scala/org/apache/openwhisk/core/database/test/BatcherTests.scala

Lines changed: 74 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -54,32 +54,32 @@ class BatcherTests extends FlatSpec with Matchers with WskActorSystem {
5454

5555
val transform = (i: Int) => i + 1
5656

57-
val batchOperation = LoggedFunction((els: Seq[Int]) => {
57+
val batchOperation = LoggedFunction((els: Seq[Int], retry: Int) => {
5858
batchPromises.dequeue().future.map(_ => els.map(transform))
5959
})
6060

61-
val batcher = new Batcher[Int, Int](2, 1)(batchOperation)
61+
val batcher = new Batcher[Int, Int](2, 1, 1)(batchOperation)
6262

6363
val values = 1 to 5
6464
val results = values.map(batcher.put)
6565

6666
// First "batch"
6767
retry(batchOperation.calls should have size 1, (promiseDelay.toMillis * 2).toInt)
68-
batchOperation.calls(0) should have size 1
68+
batchOperation.calls(0)._1 should have size 1
6969

7070
// Allow batch to build up
7171
resolveDelayed(ps(0))
7272

7373
// Second batch
7474
retry(batchOperation.calls should have size 2, (promiseDelay.toMillis * 2).toInt)
75-
batchOperation.calls(1) should have size 2
75+
batchOperation.calls(1)._1 should have size 2
7676

7777
// Allow batch to build up
7878
resolveDelayed(ps(1))
7979

8080
// Third batch
8181
retry(batchOperation.calls should have size 3, (promiseDelay.toMillis * 2).toInt)
82-
batchOperation.calls(2) should have size 2
82+
batchOperation.calls(2)._1 should have size 2
8383
ps(2).success(())
8484

8585
await(Future.sequence(results)) shouldBe values.map(transform)
@@ -90,7 +90,7 @@ class BatcherTests extends FlatSpec with Matchers with WskActorSystem {
9090
val parallel = new AtomicInteger(0)
9191
val concurrency = 2
9292

93-
val batcher = new Batcher[Int, Int](1, concurrency)(els => {
93+
val batcher = new Batcher[Int, Int](1, concurrency, 1)((els, _) => {
9494
parallel.incrementAndGet()
9595
p.future.map(_ => els)
9696
})
@@ -108,7 +108,7 @@ class BatcherTests extends FlatSpec with Matchers with WskActorSystem {
108108
}
109109

110110
it should "complete batched values with the thrown exception" in {
111-
val batcher = new Batcher[Int, Int](2, 1)(_ => Future.failed(new Exception))
111+
val batcher = new Batcher[Int, Int](2, 1, 1)((_, _) => Future.failed(new Exception))
112112

113113
val r1 = batcher.put(1)
114114
val r2 = batcher.put(2)
@@ -123,4 +123,71 @@ class BatcherTests extends FlatSpec with Matchers with WskActorSystem {
123123
an[Exception] should be thrownBy await(r3)
124124
an[Exception] should be thrownBy await(r4)
125125
}
126+
127+
it should "complete batched values with max retry limit" in {
128+
val p = Promise[Unit]()
129+
130+
val maxRetry = 3
131+
val batchSize = 1
132+
val concurrency = 1
133+
134+
var retryCount = new AtomicInteger(0)
135+
136+
def doStore(els: Seq[Int], retry: Int): Future[Seq[Int]] = {
137+
val result = if (retry > 0) {
138+
Future.failed(new Exception)
139+
} else {
140+
p.future.map(_ => els)
141+
}
142+
143+
result.recoverWith {
144+
case _ if retry > 0 =>
145+
retryCount.incrementAndGet()
146+
doStore(els, retry - 1)
147+
case e =>
148+
Future.failed(e)
149+
}
150+
151+
}
152+
val batcher = new Batcher[Int, Int](batchSize, concurrency, maxRetry)(doStore)
153+
154+
val values = List(1)
155+
val results = values.map(batcher.put)
156+
157+
p.success(())
158+
159+
await(Future.sequence(results)) shouldBe values
160+
161+
retryCount.get() shouldBe maxRetry
162+
}
163+
164+
it should "complete batched values with the thrown exception with max retry limit" in {
165+
val p = Promise[Unit]()
166+
167+
val maxRetry = 3
168+
val batchSize = 1
169+
val concurrency = 1
170+
171+
val retryCount = new AtomicInteger(0)
172+
173+
def doStore(els: Seq[Int], retry: Int): Future[Seq[Int]] = {
174+
val result = Future.failed(new Exception)
175+
176+
result.recoverWith {
177+
case _ if retry > 0 =>
178+
retryCount.incrementAndGet()
179+
doStore(els, retry - 1)
180+
case e =>
181+
Future.failed(e)
182+
}
183+
184+
}
185+
val batcher = new Batcher[Int, Int](batchSize, concurrency, maxRetry)(doStore)
186+
187+
val r1 = batcher.put(1)
188+
189+
an[Exception] should be thrownBy await(r1)
190+
191+
retryCount.get() shouldBe maxRetry
192+
}
126193
}

0 commit comments

Comments
 (0)