Skip to content

Commit 35f1a3e

Browse files
chetanmehAndy Steedtysonnorris
authored
AttachmentStore implementation based on Azure Blob (#4716)
* Initial implementation for the Azure Blob based AttachmentStore * add retry options for azure blob sdk to avoid sporadic long running requests removed redundant blob list code * add additional config for retry options Co-authored-by: Andy Steed <[email protected]> Co-authored-by: Tyson Norris <[email protected]>
1 parent 470eaf5 commit 35f1a3e

File tree

9 files changed

+502
-0
lines changed

9 files changed

+502
-0
lines changed

common/scala/build.gradle

+4
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,10 @@ dependencies {
9494
exclude group: 'com.fasterxml.jackson.dataformat'
9595
}
9696
compile "com.amazonaws:aws-java-sdk-cloudfront:1.11.517"
97+
98+
compile ("com.azure:azure-storage-blob:12.6.0") {
99+
exclude group: "com.azure", module: "azure-core-test"
100+
}
97101
}
98102

99103
configurations {

common/scala/src/main/resources/application.conf

+34
Original file line numberDiff line numberDiff line change
@@ -335,6 +335,40 @@ whisk {
335335
# }
336336
# }
337337

338+
azure-blob {
339+
# Config property when using AzureBlobAttachmentStore
340+
# whisk {
341+
# spi {
342+
# AttachmentStoreProvider = org.apache.openwhisk.core.database.azblob.AzureBlobAttachmentStoreProvider
343+
# }
344+
#}
345+
346+
# Blob container endpoint like https://foostore.blob.core.windows.net/test-ow-travis
347+
# It is of format https://<account-name>.blob.core.windows.net/<container-name>
348+
# endpoint =
349+
350+
# Storage account name
351+
# account-name =
352+
353+
# Container name within storage account used to store the blobs
354+
# container-name =
355+
356+
# Shared key credentials
357+
# https://github.com/Azure/azure-sdk-for-java/tree/master/sdk/storage/azure-storage-blob#shared-key-credential
358+
# account-key
359+
360+
# Folder path within the container (optional)
361+
# prefix
362+
363+
retry-config {
364+
retry-policy-type = FIXED
365+
max-tries = 3
366+
try-timeout = 5 seconds
367+
retry-delay = 10 milliseconds
368+
#secondary-host = ""
369+
}
370+
}
371+
338372
# transaction ID related configuration
339373
transactions {
340374
header = "X-Request-ID"

common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala

+2
Original file line numberDiff line numberDiff line change
@@ -272,4 +272,6 @@ object ConfigKeys {
272272
val apacheClientConfig = "whisk.apache-client"
273273

274274
val parameterStorage = "whisk.parameter-storage"
275+
276+
val azBlob = "whisk.azure-blob"
275277
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,276 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.openwhisk.core.database.azblob
19+
20+
import akka.actor.ActorSystem
21+
import akka.event.Logging
22+
import akka.event.Logging.InfoLevel
23+
import akka.http.scaladsl.model.ContentType
24+
import akka.stream.ActorMaterializer
25+
import akka.stream.scaladsl.{Sink, Source}
26+
import akka.util.{ByteString, ByteStringBuilder}
27+
import com.azure.storage.blob.{BlobContainerAsyncClient, BlobContainerClientBuilder}
28+
import com.azure.storage.common.StorageSharedKeyCredential
29+
import com.azure.storage.common.policy.{RequestRetryOptions, RetryPolicyType}
30+
import com.typesafe.config.Config
31+
import org.apache.openwhisk.common.LoggingMarkers.{
32+
DATABASE_ATTS_DELETE,
33+
DATABASE_ATT_DELETE,
34+
DATABASE_ATT_GET,
35+
DATABASE_ATT_SAVE
36+
}
37+
import org.apache.openwhisk.common.{Logging, TransactionId}
38+
import org.apache.openwhisk.core.ConfigKeys
39+
import org.apache.openwhisk.core.database.StoreUtils.{combinedSink, reportFailure}
40+
import org.apache.openwhisk.core.database.{
41+
AttachResult,
42+
AttachmentStore,
43+
AttachmentStoreProvider,
44+
DocumentSerializer,
45+
NoDocumentException
46+
}
47+
import org.apache.openwhisk.core.entity.DocId
48+
import pureconfig._
49+
import pureconfig.generic.auto._
50+
import reactor.core.publisher.Flux
51+
52+
import scala.compat.java8.FutureConverters._
53+
import scala.concurrent.duration.FiniteDuration
54+
import scala.concurrent.{ExecutionContext, Future}
55+
import scala.reflect.ClassTag
56+
import scala.util.Success
57+
58+
case class AzBlobConfig(endpoint: String,
59+
accountKey: String,
60+
containerName: String,
61+
accountName: String,
62+
connectionString: Option[String],
63+
prefix: Option[String],
64+
retryConfig: AzBlobRetryConfig) {
65+
def prefixFor[D](implicit tag: ClassTag[D]): String = {
66+
val className = tag.runtimeClass.getSimpleName.toLowerCase
67+
prefix.map(p => s"$p/$className").getOrElse(className)
68+
}
69+
}
70+
case class AzBlobRetryConfig(retryPolicyType: RetryPolicyType,
71+
maxTries: Int,
72+
tryTimeout: FiniteDuration,
73+
retryDelay: FiniteDuration,
74+
secondaryHost: Option[String])
75+
object AzureBlobAttachmentStoreProvider extends AttachmentStoreProvider {
76+
override def makeStore[D <: DocumentSerializer: ClassTag]()(implicit actorSystem: ActorSystem,
77+
logging: Logging,
78+
materializer: ActorMaterializer): AttachmentStore = {
79+
makeStore[D](actorSystem.settings.config)
80+
}
81+
82+
def makeStore[D <: DocumentSerializer: ClassTag](config: Config)(implicit actorSystem: ActorSystem,
83+
logging: Logging,
84+
materializer: ActorMaterializer): AttachmentStore = {
85+
val azConfig = loadConfigOrThrow[AzBlobConfig](config, ConfigKeys.azBlob)
86+
new AzureBlobAttachmentStore(createClient(azConfig), azConfig.prefixFor[D])
87+
}
88+
89+
def createClient(config: AzBlobConfig): BlobContainerAsyncClient = {
90+
val builder = new BlobContainerClientBuilder()
91+
92+
//If connection string is specified then it would have all needed info
93+
//Mostly used for testing using Azurite
94+
config.connectionString match {
95+
case Some(s) => builder.connectionString(s)
96+
case _ =>
97+
builder
98+
.endpoint(config.endpoint)
99+
.credential(new StorageSharedKeyCredential(config.accountName, config.accountKey))
100+
}
101+
102+
builder
103+
.containerName(config.containerName)
104+
.retryOptions(new RequestRetryOptions(
105+
config.retryConfig.retryPolicyType,
106+
config.retryConfig.maxTries,
107+
config.retryConfig.tryTimeout.toSeconds.toInt,
108+
config.retryConfig.retryDelay.toMillis,
109+
config.retryConfig.retryDelay.toMillis,
110+
config.retryConfig.secondaryHost.orNull))
111+
.buildAsyncClient()
112+
}
113+
}
114+
115+
class AzureBlobAttachmentStore(client: BlobContainerAsyncClient, prefix: String)(implicit system: ActorSystem,
116+
logging: Logging,
117+
materializer: ActorMaterializer)
118+
extends AttachmentStore {
119+
override protected[core] def scheme: String = "az"
120+
121+
override protected[core] implicit val executionContext: ExecutionContext = system.dispatcher
122+
123+
override protected[core] def attach(
124+
docId: DocId,
125+
name: String,
126+
contentType: ContentType,
127+
docStream: Source[ByteString, _])(implicit transid: TransactionId): Future[AttachResult] = {
128+
require(name != null, "name undefined")
129+
val start =
130+
transid.started(this, DATABASE_ATT_SAVE, s"[ATT_PUT] uploading attachment '$name' of document 'id: $docId'")
131+
val blobClient = getBlobClient(docId, name)
132+
133+
//TODO Use BlobAsyncClient#upload(Flux<ByteBuffer>, com.azure.storage.blob.models.ParallelTransferOptions, boolean)
134+
val uploadSink = Sink.fold[ByteStringBuilder, ByteString](new ByteStringBuilder)((builder, b) => builder ++= b)
135+
136+
val f = docStream.runWith(combinedSink(uploadSink))
137+
val g = f.flatMap { r =>
138+
val buff = r.uploadResult.result().compact
139+
val uf = blobClient.upload(Flux.fromArray(Array(buff.asByteBuffer)), buff.size).toFuture.toScala
140+
uf.map(_ => AttachResult(r.digest, r.length))
141+
}
142+
143+
g.foreach(_ =>
144+
transid
145+
.finished(this, start, s"[ATT_PUT] '$prefix' completed uploading attachment '$name' of document 'id: $docId'"))
146+
147+
reportFailure(
148+
g,
149+
start,
150+
failure => s"[ATT_PUT] '$prefix' internal error, name: '$name', doc: '$docId', failure: '${failure.getMessage}'")
151+
}
152+
153+
override protected[core] def readAttachment[T](docId: DocId, name: String, sink: Sink[ByteString, Future[T]])(
154+
implicit transid: TransactionId): Future[T] = {
155+
require(name != null, "name undefined")
156+
val start =
157+
transid.started(
158+
this,
159+
DATABASE_ATT_GET,
160+
s"[ATT_GET] '$prefix' finding attachment '$name' of document 'id: $docId'")
161+
val blobClient = getBlobClient(docId, name)
162+
val f = blobClient.exists().toFuture.toScala.flatMap { exists =>
163+
if (exists) {
164+
val bbFlux = blobClient.download()
165+
val rf = Source.fromPublisher(bbFlux).map(ByteString(_)).runWith(sink)
166+
rf.andThen {
167+
case Success(_) =>
168+
transid
169+
.finished(
170+
this,
171+
start,
172+
s"[ATT_GET] '$prefix' completed: found attachment '$name' of document 'id: $docId'")
173+
}
174+
} else {
175+
transid
176+
.finished(
177+
this,
178+
start,
179+
s"[ATT_GET] '$prefix', retrieving attachment '$name' of document 'id: $docId'; not found.",
180+
logLevel = Logging.ErrorLevel)
181+
Future.failed(NoDocumentException("Not found on 'readAttachment'."))
182+
}
183+
}
184+
185+
reportFailure(
186+
f,
187+
start,
188+
failure =>
189+
s"[ATT_GET] '$prefix' internal error, name: '$name', doc: 'id: $docId', failure: '${failure.getMessage}'")
190+
}
191+
192+
override protected[core] def deleteAttachments(docId: DocId)(implicit transid: TransactionId): Future[Boolean] = {
193+
val start =
194+
transid.started(
195+
this,
196+
DATABASE_ATTS_DELETE,
197+
s"[ATTS_DELETE] deleting attachments of document 'id: $docId' with prefix ${objectKeyPrefix(docId)}")
198+
199+
var count = 0
200+
val f = Source
201+
.fromPublisher(client.listBlobsByHierarchy(objectKeyPrefix(docId)))
202+
.mapAsync(1) { b =>
203+
count += 1
204+
val startDelete =
205+
transid.started(
206+
this,
207+
DATABASE_ATT_DELETE,
208+
s"[ATT_DELETE] deleting attachment '${b.getName}' of document 'id: $docId'")
209+
client
210+
.getBlobAsyncClient(b.getName)
211+
.delete()
212+
.toFuture
213+
.toScala
214+
.map(
215+
_ =>
216+
transid.finished(
217+
this,
218+
startDelete,
219+
s"[ATT_DELETE] completed: deleting attachment '${b.getName}' of document 'id: $docId'"))
220+
.recover {
221+
case t =>
222+
transid.failed(
223+
this,
224+
startDelete,
225+
s"[ATT_DELETE] failed: deleting attachment '${b.getName}' of document 'id: $docId' error: $t")
226+
}
227+
228+
}
229+
.recover {
230+
case t =>
231+
logging.error(this, s"[ATT_DELETE] :error in delete ${t}")
232+
throw t
233+
}
234+
.runWith(Sink.seq)
235+
.map(_ => true)
236+
237+
f.foreach(
238+
_ =>
239+
transid.finished(
240+
this,
241+
start,
242+
s"[ATTS_DELETE] completed: deleting ${count} attachments of document 'id: $docId'",
243+
InfoLevel))
244+
245+
reportFailure(
246+
f,
247+
start,
248+
failure => s"[ATTS_DELETE] '$prefix' internal error, doc: '$docId', failure: '${failure.getMessage}'")
249+
}
250+
251+
override protected[core] def deleteAttachment(docId: DocId, name: String)(
252+
implicit transid: TransactionId): Future[Boolean] = {
253+
val start =
254+
transid.started(this, DATABASE_ATT_DELETE, s"[ATT_DELETE] deleting attachment '$name' of document 'id: $docId'")
255+
256+
val f = getBlobClient(docId, name).delete().toFuture.toScala.map(_ => true)
257+
258+
f.foreach(_ =>
259+
transid.finished(this, start, s"[ATT_DELETE] completed: deleting attachment '$name' of document 'id: $docId'"))
260+
261+
reportFailure(
262+
f,
263+
start,
264+
failure => s"[ATT_DELETE] '$prefix' internal error, doc: '$docId', failure: '${failure.getMessage}'")
265+
}
266+
267+
override def shutdown(): Unit = {}
268+
269+
private def objectKey(id: DocId, name: String): String = s"$prefix/${id.id}/$name"
270+
271+
private def objectKeyPrefix(id: DocId): String =
272+
s"$prefix/${id.id}/" //must end with a slash so that ".../<package>/<action>other" does not match for "<package>/<action>"
273+
274+
private def getBlobClient(docId: DocId, name: String) =
275+
client.getBlobAsyncClient(objectKey(docId, name)).getBlockBlobAsyncClient
276+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.openwhisk.core.database.azblob
19+
20+
import akka.actor.ActorSystem
21+
import akka.stream.ActorMaterializer
22+
import com.typesafe.config.ConfigFactory
23+
import org.apache.openwhisk.common.Logging
24+
import org.apache.openwhisk.core.database.{AttachmentStore, DocumentSerializer}
25+
import org.scalatest.FlatSpec
26+
27+
import scala.reflect.ClassTag
28+
29+
trait AzureBlob extends FlatSpec {
30+
def makeAzureStore[D <: DocumentSerializer: ClassTag]()(implicit actorSystem: ActorSystem,
31+
logging: Logging,
32+
materializer: ActorMaterializer): AttachmentStore = {
33+
val config = ConfigFactory.parseString(s"""
34+
|whisk {
35+
| azure-blob {
36+
| endpoint = "$endpoint"
37+
| account-name = "$accountName"
38+
| container-name = "$containerName"
39+
| account-key = "$accountKey"
40+
| prefix = $prefix
41+
| }
42+
|}""".stripMargin).withFallback(ConfigFactory.load()).resolve()
43+
AzureBlobAttachmentStoreProvider.makeStore[D](config)
44+
}
45+
46+
override protected def withFixture(test: NoArgTest) = {
47+
assume(
48+
accountKey != null,
49+
"'AZ_ACCOUNT_KEY' env not configured. Configure following " +
50+
"env variables for test to run. 'AZ_ENDPOINT', 'AZ_ACCOUNT_NAME', 'AZ_CONTAINER_NAME'")
51+
super.withFixture(test)
52+
}
53+
54+
val endpoint = System.getenv("AZ_ENDPOINT")
55+
val accountName = System.getenv("AZ_ACCOUNT_NAME")
56+
val containerName = sys.env.getOrElse("AZ_CONTAINER_NAME", "test-ow-travis")
57+
val accountKey = System.getenv("AZ_ACCOUNT_KEY")
58+
59+
def prefix: String
60+
}

0 commit comments

Comments
 (0)