@@ -22,8 +22,7 @@ import android.net.Network
22
22
import androidx.annotation.VisibleForTesting
23
23
import com.amplifyframework.datastore.DataStoreException
24
24
import io.reactivex.rxjava3.core.Observable
25
- import io.reactivex.rxjava3.core.ObservableEmitter
26
- import io.reactivex.rxjava3.core.ObservableOnSubscribe
25
+ import io.reactivex.rxjava3.subjects.BehaviorSubject
27
26
import java.util.concurrent.TimeUnit
28
27
29
28
/* *
@@ -54,43 +53,29 @@ public interface ReachabilityMonitor {
54
53
}
55
54
56
55
private class ReachabilityMonitorImpl constructor(val schedulerProvider : SchedulerProvider ) : ReachabilityMonitor {
57
- private var emitter: ObservableOnSubscribe <Boolean >? = null
58
-
56
+ private val subject = BehaviorSubject .create<Boolean >()
59
57
override fun configure (context : Context ) {
60
58
return configure(context, DefaultConnectivityProvider ())
61
59
}
62
60
63
61
override fun configure (context : Context , connectivityProvider : ConnectivityProvider ) {
64
- emitter = ObservableOnSubscribe { emitter ->
65
- val callback = getCallback(emitter)
66
- connectivityProvider.registerDefaultNetworkCallback(context, callback)
67
- // Provide the current network status upon subscription.
68
- emitter.onNext(connectivityProvider.hasActiveNetwork)
69
- }
70
- }
62
+ connectivityProvider.registerDefaultNetworkCallback(
63
+ context,
64
+ object : NetworkCallback () {
65
+ override fun onAvailable (network : Network ) {
66
+ subject.onNext(true )
67
+ }
71
68
72
- override fun getObservable (): Observable <Boolean > {
73
- emitter?.let { emitter ->
74
- return Observable .create(emitter)
75
- .subscribeOn(schedulerProvider.io())
76
- .debounce(250 , TimeUnit .MILLISECONDS , schedulerProvider.computation())
77
- } ? : run {
78
- throw DataStoreException (
79
- " ReachabilityMonitor has not been configured." ,
80
- " Call ReachabilityMonitor.configure() before calling ReachabilityMonitor.getObservable()"
81
- )
82
- }
69
+ override fun onLost (network : Network ) {
70
+ subject.onNext(false )
71
+ }
72
+ }
73
+ )
83
74
}
84
75
85
- private fun getCallback (emitter : ObservableEmitter <Boolean >): NetworkCallback {
86
- return object : NetworkCallback () {
87
- override fun onAvailable (network : Network ) {
88
- emitter.onNext(true )
89
- }
90
- override fun onLost (network : Network ) {
91
- emitter.onNext(false )
92
- }
93
- }
76
+ override fun getObservable (): Observable <Boolean > {
77
+ return subject.subscribeOn(schedulerProvider.io())
78
+ .debounce(250 , TimeUnit .MILLISECONDS , schedulerProvider.computation())
94
79
}
95
80
}
96
81
0 commit comments