Skip to content

Commit 97387f8

Browse files
Rename graph stage classes used for mongodb gridfs
1 parent 2603627 commit 97387f8

File tree

4 files changed

+17
-17
lines changed

4 files changed

+17
-17
lines changed

common/scala/src/main/scala/org/apache/openwhisk/core/database/mongodb/MongoDBArtifactStore.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -434,7 +434,7 @@ class MongoDBArtifactStore[DocumentAbstraction <: DocumentSerializer](client: Mo
434434
val option = new GridFSUploadOptions().metadata(document)
435435

436436
val uploadStream = gridFSBucket.openUploadStream(BsonString(s"$id/$name"), name, option)
437-
val sink = AsyncStreamSink(uploadStream)
437+
val sink = MongoDBAsyncStreamSink(uploadStream)
438438

439439
val f = docStream
440440
.runWith(combinedSink(sink))
@@ -498,7 +498,7 @@ class MongoDBArtifactStore[DocumentAbstraction <: DocumentSerializer](client: Mo
498498
val downloadStream = gridFSBucket.openDownloadStream(BsonString(s"${doc.id.id}/$attachmentName"))
499499

500500
def readStream(file: GridFSFile) = {
501-
val source = AsyncStreamSource(downloadStream)
501+
val source = MongoDBAsyncStreamSource(downloadStream)
502502
source
503503
.runWith(sink)
504504
.map { result =>

common/scala/src/main/scala/org/apache/openwhisk/core/database/mongodb/AsyncStreamSink.scala renamed to common/scala/src/main/scala/org/apache/openwhisk/core/database/mongodb/MongoDBAsyncStreamSink.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ import org.mongodb.scala.gridfs.{AsyncOutputStream}
3030
import scala.concurrent.{ExecutionContext, Future, Promise}
3131
import scala.util.{Failure, Success, Try}
3232

33-
class AsyncStreamSink(stream: AsyncOutputStream)(implicit ec: ExecutionContext)
33+
class MongoDBAsyncStreamSink(stream: AsyncOutputStream)(implicit ec: ExecutionContext)
3434
extends GraphStageWithMaterializedValue[SinkShape[ByteString], Future[IOResult]] {
3535
val in: Inlet[ByteString] = Inlet("AsyncStream.in")
3636

@@ -115,8 +115,8 @@ class AsyncStreamSink(stream: AsyncOutputStream)(implicit ec: ExecutionContext)
115115
}
116116
}
117117

118-
object AsyncStreamSink {
118+
object MongoDBAsyncStreamSink {
119119
def apply(stream: AsyncOutputStream)(implicit ec: ExecutionContext): Sink[ByteString, Future[IOResult]] = {
120-
Sink.fromGraph(new AsyncStreamSink(stream))
120+
Sink.fromGraph(new MongoDBAsyncStreamSink(stream))
121121
}
122122
}

common/scala/src/main/scala/org/apache/openwhisk/core/database/mongodb/AsyncStreamSource.scala renamed to common/scala/src/main/scala/org/apache/openwhisk/core/database/mongodb/MongoDBAsyncStreamSource.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ import scala.util.Success
4040
import scala.util.Try
4141
import scala.util.Failure
4242

43-
class AsyncStreamSource(stream: AsyncInputStream, chunkSize: Int)(implicit ec: ExecutionContext)
43+
class MongoDBAsyncStreamSource(stream: AsyncInputStream, chunkSize: Int)(implicit ec: ExecutionContext)
4444
extends GraphStageWithMaterializedValue[SourceShape[ByteString], Future[IOResult]] {
4545
require(chunkSize > 0, "chunkSize must be greater than 0")
4646
val out: Outlet[ByteString] = Outlet("AsyncStream.out")
@@ -96,9 +96,9 @@ class AsyncStreamSource(stream: AsyncInputStream, chunkSize: Int)(implicit ec: E
9696
}
9797
}
9898

99-
object AsyncStreamSource {
99+
object MongoDBAsyncStreamSource {
100100
def apply(stream: AsyncInputStream, chunkSize: Int = 512 * 1024)(
101101
implicit ec: ExecutionContext): Source[ByteString, Future[IOResult]] = {
102-
Source.fromGraph(new AsyncStreamSource(stream, chunkSize))
102+
Source.fromGraph(new MongoDBAsyncStreamSource(stream, chunkSize))
103103
}
104104
}

tests/src/test/scala/org/apache/openwhisk/core/database/mongodb/AsyncStreamGraphTests.scala renamed to tests/src/test/scala/org/apache/openwhisk/core/database/mongodb/MongoDBAsyncStreamGraphTests.scala

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ import org.scalatestplus.mockito.MockitoSugar
3737
import scala.util.Random
3838

3939
@RunWith(classOf[JUnitRunner])
40-
class AsyncStreamGraphTests
40+
class MongoDBAsyncStreamGraphTests
4141
extends FlatSpec
4242
with Matchers
4343
with ScalaFutures
@@ -47,13 +47,13 @@ class AsyncStreamGraphTests
4747

4848
implicit val mat = ActorMaterializer()
4949

50-
behavior of "AsyncStreamSource"
50+
behavior of "MongoDBAsyncStreamSource"
5151

5252
it should "read all bytes" in {
5353
val bytes = randomBytes(4000)
5454
val asyncStream = AsyncStreamHelper.toAsyncInputStream(bytes)
5555

56-
val readStream = AsyncStreamSource(asyncStream, 42).runWith(StreamConverters.asInputStream())
56+
val readStream = MongoDBAsyncStreamSource(asyncStream, 42).runWith(StreamConverters.asInputStream())
5757
val readBytes = IOUtils.toByteArray(readStream)
5858

5959
bytes shouldBe readBytes
@@ -65,7 +65,7 @@ class AsyncStreamGraphTests
6565
val spiedStream = spy(inputStream)
6666
val asyncStream = AsyncStreamHelper.toAsyncInputStream(spiedStream)
6767

68-
val readStream = AsyncStreamSource(asyncStream, 42).runWith(StreamConverters.asInputStream())
68+
val readStream = MongoDBAsyncStreamSource(asyncStream, 42).runWith(StreamConverters.asInputStream())
6969
val readBytes = IOUtils.toByteArray(readStream)
7070

7171
bytes shouldBe readBytes
@@ -79,7 +79,7 @@ class AsyncStreamGraphTests
7979
doThrow(exception).when(inputStream).read(any())
8080
val asyncStream = AsyncStreamHelper.toAsyncInputStream(inputStream)
8181

82-
val (ioResult, p) = AsyncStreamSource(asyncStream).toMat(Sink.asPublisher(false))(Keep.both).run()
82+
val (ioResult, p) = MongoDBAsyncStreamSource(asyncStream).toMat(Sink.asPublisher(false))(Keep.both).run()
8383
val c = TestSubscriber.manualProbe[ByteString]()
8484
p.subscribe(c)
8585

@@ -92,7 +92,7 @@ class AsyncStreamGraphTests
9292
ioResult.futureValue.status.isFailure shouldBe true
9393
}
9494

95-
behavior of "AsyncStreamSink"
95+
behavior of "MongoDBAsyncStreamSink"
9696

9797
it should "write all bytes" in {
9898
val bytes = randomBytes(4000)
@@ -101,7 +101,7 @@ class AsyncStreamGraphTests
101101
val os = new ByteArrayOutputStream()
102102
val asyncStream = AsyncStreamHelper.toAsyncOutputStream(os)
103103

104-
val sink = AsyncStreamSink(asyncStream)
104+
val sink = MongoDBAsyncStreamSink(asyncStream)
105105
val ioResult = source.toMat(sink)(Keep.right).run()
106106

107107
ioResult.futureValue.count shouldBe bytes.length
@@ -117,7 +117,7 @@ class AsyncStreamGraphTests
117117
val outputStream = new CloseRecordingStream()
118118
val asyncStream = AsyncStreamHelper.toAsyncOutputStream(outputStream)
119119

120-
val sink = AsyncStreamSink(asyncStream)
120+
val sink = MongoDBAsyncStreamSink(asyncStream)
121121
val ioResult = source.toMat(sink)(Keep.right).run()
122122

123123
ioResult.futureValue.count shouldBe 4000
@@ -129,7 +129,7 @@ class AsyncStreamGraphTests
129129
val os = new ByteArrayOutputStream()
130130
val asyncStream = AsyncStreamHelper.toAsyncOutputStream(os)
131131

132-
val sink = AsyncStreamSink(asyncStream)
132+
val sink = MongoDBAsyncStreamSink(asyncStream)
133133
val ioResult = Source(1 to 10)
134134
.map { n
135135
if (n == 7) throw new Error("bees!")

0 commit comments

Comments
 (0)