Skip to content

Experiment with a bearer which allows to avoid extra threads in a Driver #7

Open
@coot

Description

@coot

We need a new Channel type:

data Channel m = Channel {

    send :: LBS.ByteString -> m (),

    recv :: STM m (Maybe LBS.ByteString)
  }

The Driver type can stay as is. However codec type is not general enough:

data Codec ps failure m bytes = Codec {
       encode :: forall (st :: ps) (st' :: ps).
                 SingI st
              => ActiveState st
              => Message ps st st'
              -> bytes,

       decode :: forall (st :: ps).
                 ActiveState st
              => Sing st
              -> m (DecodeStep bytes failure m (SomeMessage st))
     }

We will need a codec which works in both m and STM m. cborg requires access to the ST operations, e.g. mkCodecCborStrictST but STM monad has no MonadST instance.

In coot/typed-protocols-rewrite branch we have:

runDecoderWithChannel :: MonadSTM m
                      => Channel m bytes
                      -> Maybe bytes
                      -> DecodeStep bytes failure m a
                      -> m (Either failure (a, Maybe bytes))


tryRunDecoderWithChannel :: Monad m
                         => Channel m bytes
                         -> Maybe bytes
                         -> DecodeStep bytes failure m (SomeMessage st)
                         -> m (Either failure
                                (Either (DriverState ps pr st bytes failure (Maybe bytes) m)
                                        (SomeMessage st, Maybe bytes)))

because of the above constraint we cannot change its signature to STM m, but we can guarantee that all recvs are non blocking (e.g. atomically $ Just <$> recv `orElse` pure Nothing).

The tryRunDecoderWithChannel one is used to implement the tryRecvMessage record field of Driver. And it is plausible to implement it with recv :: STM m (Maybe ByteString))

data Driver ps (pr :: PeerRole) bytes failure dstate m =
        Driver {
          ...
          tryRecvMessage :: forall (st :: ps).
                            SingI st
                         => ActiveState st
                         => ReflRelativeAgency (StateAgency st)
                                                TheyHaveAgency
                                               (Relative pr (StateAgency st))
                         -> DriverState ps pr st bytes failure dstate m
                         -> m (Either (DriverState ps pr st bytes failure dstate m)
                                      ( SomeMessage st
                                      , dstate
                                      ))
        , -- | Construct a non-blocking stm action which awaits for the
          -- message.
          --
          recvMessageSTM :: forall (st :: ps).
                            SingI st
                         => ActiveState st
                         => ReflRelativeAgency (StateAgency st)
                                                TheyHaveAgency
                                               (Relative pr (StateAgency st))
                         -> DriverState ps pr st bytes failure dstate m
                         -> m (STM m (SomeMessage st, dstate))

        , startDState    :: dstate
        }

The question is how we can implement recvMessageSTM. For that it seems that being able to run a decoder in the STM monad (without forking a thread) is indispensable.

GHC exposes unsafeIOToSTM which could be used to lift ST to STM (via IO), but this is rather dodgy way, so a different solution is needed. On the other hand, a rudimentary inspection of cborg library shows that ST is deeply grained, e.g.

Metadata

Metadata

Assignees

Labels

No labels
No labels

Type

No type

Projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions