-
Notifications
You must be signed in to change notification settings - Fork 122
(feat) Add a network status listener to restart DataStore after the network comes online #2148
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
53 commits
Select commit
Hold shift + click to select a range
2960fa9
Add a network status listener to restart DataStore after the network …
mikschn-aws 507ac0d
Add Reachability monitor
mikschn-aws 2874238
working pretty well
mikschn-aws 22ff720
cleanup
mikschn-aws 099c2f1
update test
mikschn-aws d57b50a
fix: fix integration test and added logger to integration test (#2143)
sdhuka e0a9f16
Fix for when move to idle state is called twice (#2152)
gpanshu 0a45371
Update README.md (#2120)
div5yesh 0a2e0f9
Dengdan stress test (#2153)
dengdan154 7d17ae2
Merge branch 'main' into mikschn/restart-network
mikepschneider c35ef39
force build
mikschn-aws f94d0df
force build
mikschn-aws 19a5475
force build
mikschn-aws 5840f86
fix typo
mikschn-aws fe760fd
Add a network status listener to restart DataStore after the network …
mikschn-aws c62cab1
Add Reachability monitor
mikschn-aws 9d9d2be
working pretty well
mikschn-aws a25b9a4
cleanup
mikschn-aws 1b02840
update test
mikschn-aws 8aa659d
force build
mikschn-aws b04e1af
force build
mikschn-aws d921cf1
force build
mikschn-aws aed7746
fix typo
mikschn-aws 2f87711
reply to comments
mikschn-aws 503f2de
Add testImplementation lin eto compile tests correctly in intellij
mikschn-aws 3b5b01f
reply to comments
mikschn-aws 9f5fa8a
make ReachabilityMonitor expose the observable
mikschn-aws 1d7768e
merge from main
mikschn-aws 99403cf
Update datastore plugin to use the reachability monitor
mikschn-aws dd069b0
cleanup
mikschn-aws 83e1df2
cleanup
mikschn-aws 5441395
cleanup
mikschn-aws 9f2d67f
force tests
mikschn-aws 5318ce7
Merge branch 'main' into mikschn/restart-network
mikepschneider d46f578
cleanup
mikschn-aws 98891ae
cleanup
mikschn-aws 373daf3
force tests
mikschn-aws 6ef37f0
force tests
mikschn-aws 23be191
force tests
mikschn-aws eb621b5
force tests
mikschn-aws 736a8fe
force tests
mikschn-aws cf8cfe8
force tests
mikschn-aws 353878b
Merge branch 'main' into mikschn/restart-network
mikepschneider 7eb4304
add NETWORK_STATUS messages back to make integration tests pass
mikschn-aws fbee0b7
fix typo
mikschn-aws 52c0afa
ignore testIdentifyUserWithUserAttributes, see https://github.com/aws…
mikschn-aws a3dc0c2
cleanup imports
mikschn-aws 201cb2a
force tests
mikschn-aws 905561b
force tests
mikschn-aws 0b30c7a
remove comments
mikschn-aws be87fed
cleanup
mikschn-aws 526e72a
remove use of synchronous datastore in a test
mikschn-aws 610cf87
force tests
mikschn-aws File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
130 changes: 130 additions & 0 deletions
130
aws-datastore/src/main/java/com/amplifyframework/datastore/syncengine/ReachabilityMonitor.kt
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,130 @@ | ||
/* | ||
* Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"). | ||
* You may not use this file except in compliance with the License. | ||
* A copy of the License is located at | ||
* | ||
* http://aws.amazon.com/apache2.0 | ||
* | ||
* or in the "license" file accompanying this file. This file is distributed | ||
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either | ||
* express or implied. See the License for the specific language governing | ||
* permissions and limitations under the License. | ||
*/ | ||
|
||
package com.amplifyframework.datastore.syncengine | ||
|
||
import android.content.Context | ||
import android.net.ConnectivityManager | ||
import android.net.ConnectivityManager.NetworkCallback | ||
import android.net.Network | ||
import androidx.annotation.VisibleForTesting | ||
import com.amplifyframework.datastore.DataStoreException | ||
import io.reactivex.rxjava3.core.Observable | ||
import io.reactivex.rxjava3.core.ObservableEmitter | ||
import io.reactivex.rxjava3.core.ObservableOnSubscribe | ||
import java.util.concurrent.TimeUnit | ||
|
||
/** | ||
* The ReachabilityMonitor is responsible for watching the network status as provided by the OS. | ||
* It returns an observable that publishes "true" when the network becomes available and "false" when | ||
* the network is lost. It publishes the current status on subscription. | ||
* | ||
* ReachabilityMonitor does not try to monitor the DataStore websockets or the status of the AppSync service. | ||
* | ||
* The network changes are debounced with a 250 ms delay to allow some time for one network to connect after another | ||
* network has disconnected (for example, wifi is lost, then cellular connects) to reduce thrashing. | ||
*/ | ||
internal interface ReachabilityMonitor { | ||
fun configure(context: Context) | ||
@VisibleForTesting | ||
fun configure(context: Context, connectivityProvider: ConnectivityProvider) | ||
|
||
companion object { | ||
fun create(): ReachabilityMonitor { | ||
return ReachabilityMonitorImpl(ProdSchedulerProvider()) | ||
} | ||
|
||
fun createForTesting(baseSchedulerProvider: SchedulerProvider): ReachabilityMonitor { | ||
return ReachabilityMonitorImpl(baseSchedulerProvider) | ||
} | ||
} | ||
fun getObservable(): Observable<Boolean> | ||
} | ||
|
||
private class ReachabilityMonitorImpl constructor(val schedulerProvider: SchedulerProvider) : ReachabilityMonitor { | ||
private var emitter: ObservableOnSubscribe<Boolean>? = null | ||
|
||
override fun configure(context: Context) { | ||
return configure(context, DefaultConnectivityProvider()) | ||
} | ||
|
||
override fun configure(context: Context, connectivityProvider: ConnectivityProvider) { | ||
emitter = ObservableOnSubscribe { emitter -> | ||
val callback = getCallback(emitter) | ||
connectivityProvider.registerDefaultNetworkCallback(context, callback) | ||
// Provide the current network status upon subscription. | ||
emitter.onNext(connectivityProvider.hasActiveNetwork) | ||
} | ||
} | ||
|
||
override fun getObservable(): Observable<Boolean> { | ||
emitter?.let { emitter -> | ||
return Observable.create(emitter) | ||
.subscribeOn(schedulerProvider.io()) | ||
.debounce(250, TimeUnit.MILLISECONDS, schedulerProvider.computation()) | ||
} ?: run { | ||
throw DataStoreException( | ||
"ReachabilityMonitor has not been configured.", | ||
"Call ReachabilityMonitor.configure() before calling ReachabilityMonitor.getObservable()" | ||
) | ||
} | ||
} | ||
|
||
private fun getCallback(emitter: ObservableEmitter<Boolean>): NetworkCallback { | ||
return object : NetworkCallback() { | ||
override fun onAvailable(network: Network) { | ||
emitter.onNext(true) | ||
} | ||
override fun onLost(network: Network) { | ||
emitter.onNext(false) | ||
} | ||
} | ||
} | ||
} | ||
|
||
/** | ||
* This interface puts an abstraction layer over ConnectivityManager. Since ConnectivityManager | ||
* is a concrete class created within context.getSystemService() it can't be overridden with a test | ||
* implementation, so this interface works around that issue. | ||
*/ | ||
internal interface ConnectivityProvider { | ||
val hasActiveNetwork: Boolean | ||
fun registerDefaultNetworkCallback(context: Context, callback: NetworkCallback) | ||
} | ||
|
||
private class DefaultConnectivityProvider : ConnectivityProvider { | ||
|
||
private var connectivityManager: ConnectivityManager? = null | ||
|
||
override val hasActiveNetwork: Boolean | ||
get() = connectivityManager?.let { it.activeNetwork != null } | ||
?: run { | ||
throw DataStoreException( | ||
"ReachabilityMonitor has not been configured.", | ||
"Call ReachabilityMonitor.configure() before calling ReachabilityMonitor.getObservable()" | ||
) | ||
} | ||
|
||
override fun registerDefaultNetworkCallback(context: Context, callback: NetworkCallback) { | ||
connectivityManager = context.getSystemService(ConnectivityManager::class.java) | ||
connectivityManager?.let { it.registerDefaultNetworkCallback(callback) } | ||
?: run { | ||
throw DataStoreException( | ||
"ConnectivityManager not available", | ||
"No recovery suggestion is available" | ||
) | ||
} | ||
} | ||
} |
34 changes: 34 additions & 0 deletions
34
aws-datastore/src/main/java/com/amplifyframework/datastore/syncengine/SchedulerProvider.kt
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,34 @@ | ||
/* | ||
* Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"). | ||
* You may not use this file except in compliance with the License. | ||
* A copy of the License is located at | ||
* | ||
* http://aws.amazon.com/apache2.0 | ||
* | ||
* or in the "license" file accompanying this file. This file is distributed | ||
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either | ||
* express or implied. See the License for the specific language governing | ||
* permissions and limitations under the License. | ||
*/ | ||
package com.amplifyframework.datastore.syncengine | ||
|
||
import io.reactivex.rxjava3.android.schedulers.AndroidSchedulers | ||
import io.reactivex.rxjava3.core.Scheduler | ||
import io.reactivex.rxjava3.schedulers.Schedulers | ||
|
||
/** | ||
* This interface provides a way to give custom schedulers to RX observables for testing. | ||
*/ | ||
interface SchedulerProvider { | ||
fun io(): Scheduler | ||
fun computation(): Scheduler | ||
fun ui(): Scheduler | ||
} | ||
|
||
class ProdSchedulerProvider : SchedulerProvider { | ||
override fun computation() = Schedulers.computation() | ||
override fun ui() = AndroidSchedulers.mainThread() | ||
override fun io() = Schedulers.io() | ||
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
72 changes: 72 additions & 0 deletions
72
...astore/src/test/java/com/amplifyframework/datastore/syncengine/ReachabilityMonitorTest.kt
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,72 @@ | ||
package com.amplifyframework.datastore.syncengine | ||
|
||
import android.content.Context | ||
import android.net.ConnectivityManager | ||
import android.net.Network | ||
import com.amplifyframework.datastore.DataStoreException | ||
import io.reactivex.rxjava3.core.BackpressureStrategy | ||
import io.reactivex.rxjava3.schedulers.TestScheduler | ||
import io.reactivex.rxjava3.subscribers.TestSubscriber | ||
import java.util.concurrent.TimeUnit | ||
import org.junit.Test | ||
import org.mockito.Mockito.mock | ||
|
||
class ReachabilityMonitorTest { | ||
|
||
// Test that calling getObservable() without calling configure() first throws a DataStoreException | ||
@Test(expected = DataStoreException::class) | ||
fun testReachabilityConfigThrowsException() { | ||
ReachabilityMonitor.create().getObservable() | ||
} | ||
|
||
// Test that the debounce and the event publishing in ReachabilityMonitor works as expected. | ||
// Events that occur within 250 ms of each other should be debounced so that only the last event | ||
// of the sequence is published. | ||
@Test | ||
fun testReachabilityDebounce() { | ||
var callback: ConnectivityManager.NetworkCallback? = null | ||
|
||
val connectivityProvider = object : ConnectivityProvider { | ||
override val hasActiveNetwork: Boolean | ||
get() = run { | ||
return true | ||
} | ||
override fun registerDefaultNetworkCallback( | ||
context: Context, | ||
callback2: ConnectivityManager.NetworkCallback | ||
) { | ||
callback = callback2 | ||
} | ||
} | ||
|
||
val mockContext = mock(Context::class.java) | ||
// TestScheduler allows the virtual time to be advanced by exact amounts, to allow for repeatable tests | ||
val testScheduler = TestScheduler() | ||
val reachabilityMonitor = ReachabilityMonitor.createForTesting(TestSchedulerProvider(testScheduler)) | ||
reachabilityMonitor.configure(mockContext, connectivityProvider) | ||
|
||
// TestSubscriber allows for assertions and awaits on the items it observes | ||
val testSubscriber = TestSubscriber<Boolean>() | ||
reachabilityMonitor.getObservable() | ||
// TestSubscriber requires a Flowable | ||
.toFlowable(BackpressureStrategy.BUFFER) | ||
.subscribe(testSubscriber) | ||
|
||
val network = mock(Network::class.java) | ||
// Should provide initial network state (true) upon subscription (after debounce) | ||
testScheduler.advanceTimeBy(251, TimeUnit.MILLISECONDS) | ||
callback!!.onAvailable(network) | ||
callback!!.onAvailable(network) | ||
callback!!.onLost(network) | ||
// Should provide false after debounce | ||
testScheduler.advanceTimeBy(251, TimeUnit.MILLISECONDS) | ||
callback!!.onAvailable(network) | ||
// Should provide true after debounce | ||
testScheduler.advanceTimeBy(251, TimeUnit.MILLISECONDS) | ||
callback!!.onAvailable(network) | ||
// Should provide true after debounce | ||
testScheduler.advanceTimeBy(251, TimeUnit.MILLISECONDS) | ||
|
||
testSubscriber.assertValues(true, false, true, true) | ||
} | ||
} |
16 changes: 16 additions & 0 deletions
16
...atastore/src/test/java/com/amplifyframework/datastore/syncengine/TestSchedulerProvider.kt
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,16 @@ | ||
package com.amplifyframework.datastore.syncengine | ||
|
||
import io.reactivex.rxjava3.schedulers.Schedulers | ||
import io.reactivex.rxjava3.schedulers.TestScheduler | ||
|
||
class TrampolineSchedulerProvider : SchedulerProvider { | ||
override fun computation() = Schedulers.trampoline() | ||
override fun ui() = Schedulers.trampoline() | ||
override fun io() = Schedulers.trampoline() | ||
} | ||
|
||
class TestSchedulerProvider(private val scheduler: TestScheduler) : SchedulerProvider { | ||
override fun computation() = scheduler | ||
override fun ui() = scheduler | ||
override fun io() = scheduler | ||
} |
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is configure needed, or can this just be done in the init of the ReachabilityMonitor? Side effect right now of forgetting configure is a crash, and I don't see too much of a reason not just to allow class to configure itself since it holds on to the current network status to pass to a newly attached observer.