20
20
import static org .mockito .ArgumentMatchers .eq ;
21
21
import static org .mockito .BDDMockito .willAnswer ;
22
22
import static org .mockito .Mockito .mock ;
23
+ import static org .mockito .Mockito .when ;
23
24
24
25
import com .google .cloud .PageImpl ;
25
26
import com .google .cloud .ReadChannel ;
26
27
import com .google .cloud .spring .storage .integration .GcsSessionFactory ;
27
28
import com .google .cloud .storage .Blob ;
29
+ import com .google .cloud .storage .BlobInfo ;
28
30
import com .google .cloud .storage .Storage ;
29
31
import java .io .InputStream ;
30
- import java .time .OffsetDateTime ;
31
- import java .time .ZoneOffset ;
32
+ import java .util .ArrayList ;
32
33
import java .util .Comparator ;
33
34
import java .util .List ;
34
- import java .util .stream .Collectors ;
35
- import java .util .stream .Stream ;
35
+ import org .junit .jupiter .api .BeforeAll ;
36
36
import org .junit .jupiter .api .Test ;
37
37
import org .junit .jupiter .api .extension .ExtendWith ;
38
38
import org .springframework .beans .factory .annotation .Autowired ;
52
52
import org .springframework .test .context .aot .DisabledInAotMode ;
53
53
import org .springframework .test .context .junit .jupiter .SpringExtension ;
54
54
55
- /** Tests for the streaming message source. */
55
+ /**
56
+ * Tests for the streaming message source.
57
+ */
56
58
@ ExtendWith (SpringExtension .class )
57
59
@ ContextConfiguration
58
60
@ DisabledInAotMode
59
61
class GcsStreamingMessageSourceTests {
60
62
61
- @ Autowired private PollableChannel unsortedChannel ;
63
+ @ Autowired
64
+ private PollableChannel unsortedChannel ;
65
+
66
+ @ Autowired
67
+ private PollableChannel sortedChannel ;
68
+
69
+ private static Blob alphaBlob = mock (Blob .class );
70
+
71
+ private static Blob betaBlob = mock (Blob .class );
72
+
73
+ private static Blob gammaBlob = mock (Blob .class );
74
+
75
+ private static List <Blob > blobList = new ArrayList <>();
62
76
63
- @ Autowired private PollableChannel sortedChannel ;
77
+ @ BeforeAll
78
+ static void setUp () {
79
+ blobList .add (mockBlob (gammaBlob , "gamma" ));
80
+ blobList .add (mockBlob (betaBlob , "beta" ));
81
+ blobList .add (mockBlob (alphaBlob , "alpha/alpha" ));
82
+ }
64
83
65
84
@ Test
66
85
void testInboundStreamingChannelAdapter () {
@@ -102,30 +121,26 @@ void testSortedInboundChannelAdapter() {
102
121
assertThat (message ).isNull ();
103
122
}
104
123
105
- private static Blob createBlob (String bucket , String name ) {
106
- Blob blob = mock (Blob .class );
107
- willAnswer (invocationOnMock -> bucket ).given (blob ).getBucket ();
108
- willAnswer (invocationOnMock -> name ).given (blob ).getName ();
109
- willAnswer (invocationOnMock -> OffsetDateTime .now (ZoneOffset .UTC )).given (blob ).getUpdateTimeOffsetDateTime ();
124
+ private static Blob mockBlob (Blob blob , String name ) {
125
+ when (blob .getBucket ()).thenReturn ("gcsbucket" );
126
+ when (blob .getName ()).thenReturn (name );
110
127
return blob ;
111
128
}
112
129
113
- /** Spring config for the tests. */
130
+ /**
131
+ * Spring config for the tests.
132
+ */
114
133
@ Configuration
115
134
@ EnableIntegration
116
135
public static class Config {
117
-
118
- @ Bean
119
- public Storage gcsClient () {
136
+ private Storage gcsClient () {
120
137
Storage gcs = mock (Storage .class );
121
-
122
- List <Blob > blobList = Stream .of (
123
- createBlob ("gcsbucket" , "gamma" ),
124
- createBlob ("gcsbucket" , "beta" ),
125
- createBlob ("gcsbucket" , "alpha/alpha" ))
126
- .collect (Collectors .toList ());
127
-
128
- willAnswer (invocationOnMock -> new PageImpl <>(null , null , blobList ))
138
+ willAnswer (
139
+ invocationOnMock ->
140
+ new PageImpl <>(
141
+ null ,
142
+ null ,
143
+ blobList ))
129
144
.given (gcs )
130
145
.list (eq ("gcsbucket" ));
131
146
@@ -144,9 +159,10 @@ public Storage gcsClient() {
144
159
145
160
@ Bean
146
161
@ InboundChannelAdapter (value = "unsortedChannel" , poller = @ Poller (fixedDelay = "100" ))
147
- public MessageSource <InputStream > unsortedChannelAdapter (Storage gcs ) {
162
+ public MessageSource <InputStream > unsortedChannelAdapter () {
148
163
GcsStreamingMessageSource adapter =
149
- new GcsStreamingMessageSource (new RemoteFileTemplate <>(new GcsSessionFactory (gcs )));
164
+ new GcsStreamingMessageSource (
165
+ new RemoteFileTemplate <>(new GcsSessionFactory (gcsClient ())));
150
166
adapter .setRemoteDirectory ("gcsbucket" );
151
167
adapter .setFilter (new AcceptOnceFileListFilter <>());
152
168
@@ -155,11 +171,11 @@ public MessageSource<InputStream> unsortedChannelAdapter(Storage gcs) {
155
171
156
172
@ Bean
157
173
@ InboundChannelAdapter (value = "sortedChannel" , poller = @ Poller (fixedDelay = "100" ))
158
- public MessageSource <InputStream > sortedChannelAdapter (Storage gcs ) {
174
+ public MessageSource <InputStream > sortedChannelAdapter () {
159
175
GcsStreamingMessageSource adapter =
160
176
new GcsStreamingMessageSource (
161
- new RemoteFileTemplate <>(new GcsSessionFactory (gcs )),
162
- Comparator .comparing (blob -> blob . getName () ));
177
+ new RemoteFileTemplate <>(new GcsSessionFactory (gcsClient () )),
178
+ Comparator .comparing (BlobInfo :: getName ));
163
179
164
180
adapter .setRemoteDirectory ("gcsbucket" );
165
181
adapter .setFilter (new AcceptOnceFileListFilter <>());
0 commit comments