Skip to content

Rework the way we init and close the RustMatrixRoom #957

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jul 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import android.os.Parcelable
import androidx.compose.runtime.Composable
import androidx.compose.runtime.DisposableEffect
import androidx.compose.ui.Modifier
import androidx.lifecycle.Lifecycle
import androidx.lifecycle.lifecycleScope
import com.bumble.appyx.core.composable.Children
import com.bumble.appyx.core.lifecycle.subscribe
Expand Down Expand Up @@ -161,13 +162,16 @@ class RoomLoadedFlowNode @AssistedInject constructor(

@Composable
override fun View(modifier: Modifier) {
// Rely on the View Lifecycle instead of the Node Lifecycle,
// Rely on the View Lifecycle in addition to the Node Lifecycle,
// because this node enters 'onDestroy' before his children, so it can leads to
// using the room in a child node where it's already closed.
DisposableEffect(Unit) {
inputs.room.open()
inputs.room.subscribeToSync()
onDispose {
inputs.room.close()
inputs.room.unsubscribeFromSync()
if (lifecycle.currentState == Lifecycle.State.DESTROYED) {
inputs.room.destroy()
}
}
}
Children(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,11 @@ interface MatrixRoom : Closeable {

val timeline: MatrixTimeline

fun open(): Result<Unit>
fun destroy()

fun subscribeToSync()

fun unsubscribeFromSync()

suspend fun userDisplayName(userId: UserId): Result<String?>

Expand Down Expand Up @@ -133,6 +137,8 @@ interface MatrixRoom : Closeable {
zoomLevel: Int? = null,
assetType: AssetType? = null,
): Result<Unit>

override fun close() = destroy()
}


Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,6 @@ import io.element.android.libraries.matrix.impl.core.toProgressWatcher
import io.element.android.libraries.matrix.impl.media.map
import io.element.android.libraries.matrix.impl.room.location.toInner
import io.element.android.libraries.matrix.impl.timeline.RustMatrixTimeline
import io.element.android.libraries.matrix.impl.timeline.backPaginationStatusFlow
import io.element.android.libraries.matrix.impl.timeline.eventOrigin
import io.element.android.libraries.matrix.impl.timeline.timelineDiffFlow
import io.element.android.libraries.sessionstorage.api.SessionData
import io.element.android.services.toolbox.api.systemclock.SystemClock
import kotlinx.coroutines.CoroutineScope
Expand All @@ -51,11 +48,7 @@ import kotlinx.coroutines.cancel
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.StateFlow
import kotlinx.coroutines.flow.asStateFlow
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.launch
import kotlinx.coroutines.withContext
import org.matrix.rustcomponents.sdk.EventItemOrigin
import org.matrix.rustcomponents.sdk.RequiredState
import org.matrix.rustcomponents.sdk.Room
import org.matrix.rustcomponents.sdk.RoomListItem
Expand Down Expand Up @@ -88,7 +81,6 @@ class RustMatrixRoom(

private val roomCoroutineScope = sessionCoroutineScope.childScope(coroutineDispatchers.main, "RoomScope-$roomId")
private val _membersStateFlow = MutableStateFlow<MatrixRoomMembersState>(MatrixRoomMembersState.Unknown)
private val isInit = MutableStateFlow(false)
private val _syncUpdateFlow = MutableStateFlow(0L)
private val _timeline by lazy {
RustMatrixTimeline(
Expand All @@ -97,6 +89,7 @@ class RustMatrixRoom(
roomCoroutineScope = roomCoroutineScope,
dispatcher = roomDispatcher,
lastLoginTimestamp = sessionData.loginTimestamp,
onNewSyncedEvent = { _syncUpdateFlow.value = systemClock.epochMillis() }
)
}

Expand All @@ -106,8 +99,7 @@ class RustMatrixRoom(

override val timeline: MatrixTimeline = _timeline

override fun open(): Result<Unit> {
if (isInit.value) return Result.failure(IllegalStateException("Listener already registered"))
override fun subscribeToSync() {
val settings = RoomSubscription(
requiredState = listOf(
RequiredState(key = EventType.STATE_ROOM_CANONICAL_ALIAS, value = ""),
Expand All @@ -118,35 +110,16 @@ class RustMatrixRoom(
timelineLimit = null
)
roomListItem.subscribe(settings)
roomCoroutineScope.launch(roomDispatcher) {
innerRoom.timelineDiffFlow { initialList ->
_timeline.postItems(initialList)
}.onEach { diff ->
if (diff.eventOrigin() == EventItemOrigin.SYNC) {
_syncUpdateFlow.value = systemClock.epochMillis()
}
_timeline.postDiff(diff)
}.launchIn(this)

innerRoom.backPaginationStatusFlow()
.onEach {
_timeline.postPaginationStatus(it)
}.launchIn(this)
}

fetchMembers()
}
isInit.value = true
return Result.success(Unit)
override fun unsubscribeFromSync() {
roomListItem.unsubscribe()
}

override fun close() {
if (isInit.value) {
isInit.value = false
roomCoroutineScope.cancel()
roomListItem.unsubscribe()
innerRoom.destroy()
roomListItem.destroy()
}
override fun destroy() {
roomCoroutineScope.cancel()
innerRoom.destroy()
roomListItem.destroy()
}

override val name: String?
Expand Down Expand Up @@ -363,12 +336,6 @@ class RustMatrixRoom(
}
}

private suspend fun fetchMembers() = withContext(roomDispatcher) {
runCatching {
innerRoom.fetchMembers()
}
}

override suspend fun reportContent(eventId: EventId, reason: String, blockUserId: UserId?): Result<Unit> = withContext(roomDispatcher) {
runCatching {
innerRoom.reportContent(eventId = eventId.value, score = null, reason = reason)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ import io.element.android.libraries.matrix.impl.timeline.item.event.EventMessage
import io.element.android.libraries.matrix.impl.timeline.item.event.EventTimelineItemMapper
import io.element.android.libraries.matrix.impl.timeline.item.event.TimelineEventContentMapper
import io.element.android.libraries.matrix.impl.timeline.item.virtual.VirtualTimelineItemMapper
import kotlinx.coroutines.CompletableDeferred
import io.element.android.libraries.matrix.impl.timeline.postprocessor.TimelineEncryptedHistoryPostProcessor
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.CoroutineDispatcher
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.ExperimentalCoroutinesApi
Expand All @@ -37,17 +37,21 @@ import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.StateFlow
import kotlinx.coroutines.flow.asStateFlow
import kotlinx.coroutines.flow.getAndUpdate
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.mapLatest
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.flow.sample
import kotlinx.coroutines.launch
import kotlinx.coroutines.withContext
import org.matrix.rustcomponents.sdk.BackPaginationStatus
import org.matrix.rustcomponents.sdk.EventItemOrigin
import org.matrix.rustcomponents.sdk.PaginationOptions
import org.matrix.rustcomponents.sdk.Room
import org.matrix.rustcomponents.sdk.TimelineDiff
import org.matrix.rustcomponents.sdk.TimelineItem
import timber.log.Timber
import java.util.concurrent.atomic.AtomicBoolean
import java.util.Date
import java.util.concurrent.atomic.AtomicBoolean

private const val INITIAL_MAX_SIZE = 50

Expand All @@ -57,6 +61,7 @@ class RustMatrixTimeline(
private val innerRoom: Room,
private val dispatcher: CoroutineDispatcher,
private val lastLoginTimestamp: Date?,
private val onNewSyncedEvent: () -> Unit,
) : MatrixTimeline {

private val initLatch = CompletableDeferred<Unit>()
Expand Down Expand Up @@ -93,13 +98,40 @@ class RustMatrixTimeline(

override val paginationState: StateFlow<MatrixTimeline.PaginationState> = _paginationState.asStateFlow()

init {
Timber.d("Initialize timeline for room ${matrixRoom.roomId}")
roomCoroutineScope.launch(dispatcher) {
innerRoom.timelineDiffFlow { initialList ->
postItems(initialList)
}.onEach { diff ->
if (diff.eventOrigin() == EventItemOrigin.SYNC) {
onNewSyncedEvent()
}
postDiff(diff)
}.launchIn(this)

innerRoom.backPaginationStatusFlow()
.onEach {
postPaginationStatus(it)
}.launchIn(this)

fetchMembers()
}
}

private suspend fun fetchMembers() = withContext(dispatcher) {
runCatching {
innerRoom.fetchMembers()
}
}

@OptIn(FlowPreview::class, ExperimentalCoroutinesApi::class)
override val timelineItems: Flow<List<MatrixTimelineItem>> = _timelineItems.sample(50)
.mapLatest { items ->
encryptedHistoryPostProcessor.process(items)
}

internal suspend fun postItems(items: List<TimelineItem>) {
private suspend fun postItems(items: List<TimelineItem>) {
// Split the initial items in multiple list as there is no pagination in the cached data, so we can post timelineItems asap.
items.chunked(INITIAL_MAX_SIZE).reversed().forEach {
timelineDiffProcessor.postItems(it)
Expand All @@ -108,12 +140,12 @@ class RustMatrixTimeline(
initLatch.complete(Unit)
}

internal suspend fun postDiff(timelineDiff: TimelineDiff) {
private suspend fun postDiff(timelineDiff: TimelineDiff) {
initLatch.await()
timelineDiffProcessor.postDiff(timelineDiff)
}

internal fun postPaginationStatus(status: BackPaginationStatus) {
private fun postPaginationStatus(status: BackPaginationStatus) {
_paginationState.getAndUpdate { currentPaginationState ->
if (hasEncryptionHistoryBanner()) {
return@getAndUpdate currentPaginationState.copy(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,6 @@ class FakeMatrixRoom(
private val _sentLocations = mutableListOf<SendLocationInvocation>()
val sentLocations: List<SendLocationInvocation> = _sentLocations


var invitedUserId: UserId? = null
private set

Expand Down Expand Up @@ -128,9 +127,11 @@ class FakeMatrixRoom(

override val timeline: MatrixTimeline = matrixTimeline

override fun open(): Result<Unit> {
return Result.success(Unit)
}
override fun subscribeToSync() = Unit

override fun unsubscribeFromSync() = Unit

override fun destroy() = Unit

override suspend fun userDisplayName(userId: UserId): Result<String?> = simulateLongTask {
userDisplayNameResult
Expand Down Expand Up @@ -283,8 +284,6 @@ class FakeMatrixRoom(
return sendLocationResult
}

override fun close() = Unit

fun givenLeaveRoomError(throwable: Throwable?) {
this.leaveRoomError = throwable
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,6 @@ class RoomListScreen(
Singleton.appScope.launch {
withContext(coroutineDispatchers.io) {
matrixClient.getRoom(roomId)!!.use { room ->
room.open()
room.timeline.paginateBackwards(20, 50)
}
}
Expand Down