Skip to content

error: future cannot be sent between threads safely #542

Closed
@lidarbtc

Description

@lidarbtc

This is async version ironrdp example code + tauri.

use anyhow::{Context, Result};
use base64::{engine::general_purpose, Engine as _};
use ironrdp::connector::ConnectionResult;
use ironrdp::connector::{self, Credentials};
use ironrdp::pdu::gcc::KeyboardType;
use ironrdp::pdu::rdp::{capability_sets::MajorPlatformType, client_info::PerformanceFlags};
use ironrdp::session::image::DecodedImage;
use ironrdp::session::{ActiveStage, ActiveStageOutput};
use std::sync::Arc;
use tauri::{AppHandle, Emitter, State};
use tokio::net::TcpStream;
use tokio::sync::{mpsc, Mutex};

pub mod network_client;

#[derive(Clone)]
struct RdpState {
    tx: Arc<Mutex<Option<mpsc::Sender<RdpCommand>>>>,
}

enum RdpCommand {
    MouseEvent {
        x: u16,
        y: u16,
        button: Option<String>,
        is_pressed: bool,
    },
}

#[tauri::command]
async fn start_rdp_stream(
    app_handle: AppHandle,
    state: State<'_, RdpState>,
    hostname: String,
    port: u16,
    username: String,
    password: String,
    domain: String,
) -> Result<(), String> {
    let state = Arc::new(state.inner().clone());
    let app_handle = app_handle.clone();

    tokio::spawn(async move {
        if let Err(e) = run_rdp_session(
            app_handle, state, hostname, port, username, password, domain,
        )
        .await
        {
            eprintln!("RDP session error: {}", e);
        }
    });

    Ok(())
}

async fn run_rdp_session(
    app_handle: AppHandle,
    state: Arc<RdpState>,
    hostname: String,
    port: u16,
    username: String,
    password: String,
    domain: String,
) -> Result<(), String> {
    let config = build_config(username, password, Some(domain));

    let (connection_result, mut framed) = connect(config, hostname, port)
        .await
        .context("connect")
        .map_err(|e| e.to_string())?;

    let mut image = DecodedImage::new(
        ironrdp_graphics::image_processing::PixelFormat::RgbA32,
        connection_result.desktop_size.width,
        connection_result.desktop_size.height,
    );

    let (tx, mut rx) = mpsc::channel(100);
    {
        let mut state_tx = state.tx.lock().await;
        *state_tx = Some(tx);
    }

    let mut active_stage = ActiveStage::new(connection_result);

    loop {
        tokio::select! {
            frame_result = framed.read_pdu() => {
                match frame_result {
                    Ok((action, payload)) => {
                        match active_stage.process(&mut image, action, &payload) {
                            Ok(outputs) => {
                                for out in outputs {
                                    handle_active_stage_output(&mut framed, &app_handle, &mut image, out).await?;
                                }
                            }
                            Err(e) => {
                                eprintln!("Error processing RDP event: {:?}", e);
                            }
                        }
                    }
                    Err(e) => {
                        if e.kind() == std::io::ErrorKind::WouldBlock {
                            continue;
                        } else {
                            if let Err(emit_err) = app_handle.emit("rdp-error", format!("RDP read error: {:?}", e)) {
                                eprintln!("Failed to emit RDP error: {}", emit_err);
                            }
                            break;
                        }
                    }
                }
            }
            Some(command) = rx.recv() => {
                match command {
                    RdpCommand::MouseEvent { x, y, button, is_pressed } => {
                        println!("Mouse event: x={}, y={}, button={:?}, is_pressed={}", x, y, button, is_pressed);
                        // Handle mouse event
                    }
                }
            }
        }
    }

    Ok(())
}

async fn handle_active_stage_output(
    framed: &mut UpgradedFramed,
    app_handle: &AppHandle,
    image: &mut DecodedImage,
    output: ActiveStageOutput,
) -> Result<(), String> {
    match output {
        ActiveStageOutput::GraphicsUpdate(_region) => {
            let mut bmp = bmp::Image::new(u32::from(image.width()), u32::from(image.height()));

            image
                .data()
                .chunks_exact(
                    usize::from(image.width())
                        .checked_mul(4)
                        .expect("never overflow"),
                )
                .enumerate()
                .for_each(|(y, row)| {
                    row.chunks_exact(4).enumerate().for_each(|(x, pixel)| {
                        let r = pixel[0];
                        let g = pixel[1];
                        let b = pixel[2];
                        let _a = pixel[3];
                        bmp.set_pixel(
                            u32::try_from(x).unwrap(),
                            u32::try_from(y).unwrap(),
                            bmp::Pixel::new(r, g, b),
                        );
                    })
                });

            let mut bmp_data = Vec::new();
            bmp.to_writer(&mut bmp_data)
                .expect("Failed to write BMP data");

            let base64_image = general_purpose::STANDARD.encode(&bmp_data);

            if let Err(e) = app_handle.emit("rdp-frame", &base64_image) {
                eprintln!("Failed to send frame: {}", e);
            }
        }
        ActiveStageOutput::ResponseFrame(frame) => {
            if let Err(e) = framed.write_all(&frame).await {
                eprintln!("Failed to write response frame: {}", e);
            }
        }
        ActiveStageOutput::PointerDefault => {
            // default pointer
        }
        ActiveStageOutput::PointerHidden => {
            // hidden pointer
        }
        ActiveStageOutput::Terminate(_) => return Ok(()),
        _ => {}
    }
    Ok(())
}

#[tauri::command]
async fn send_mouse_event(
    state: State<'_, RdpState>,
    x: u16,
    y: u16,
    button: Option<String>,
    is_pressed: bool,
) -> Result<(), String> {
    let tx = state.tx.lock().await;
    if let Some(tx) = tx.as_ref() {
        tx.send(RdpCommand::MouseEvent {
            x,
            y,
            button,
            is_pressed,
        })
        .await
        .map_err(|e| format!("Failed to send mouse event: {}", e))
    } else {
        Err("RDP session not started".to_string())
    }
}

type UpgradedFramed = ironrdp_tokio::TokioFramed<tokio_rustls::client::TlsStream<TcpStream>>;

async fn connect(
    config: connector::Config,
    server_name: String,
    port: u16,
) -> anyhow::Result<(ConnectionResult, UpgradedFramed)> {
    let server_addr = lookup_addr(&server_name, port).context("lookup addr")?;

    let tcp_stream = TcpStream::connect(server_addr)
        .await
        .context("TCP connect")?;

    tcp_stream.set_nodelay(true)?;

    let mut framed = ironrdp_tokio::TokioFramed::new(tcp_stream);

    let mut connector = connector::ClientConnector::new(config).with_server_addr(server_addr);

    let should_upgrade = ironrdp_tokio::connect_begin(&mut framed, &mut connector)
        .await
        .context("begin connection")?;

    // Ensure there is no leftover
    let initial_stream = framed.into_inner_no_leftover();

    let (upgraded_stream, server_public_key) =
        ironrdp_tls::upgrade(initial_stream, &server_addr.to_string())
            .await
            .map_err(|e| connector::custom_err!("TLS upgrade", e))?;

    let upgraded = ironrdp_tokio::mark_as_upgraded(should_upgrade, &mut connector);

    let mut upgraded_framed = ironrdp_tokio::TokioFramed::new(upgraded_stream);

    let mut network_client = crate::network_client::ReqwestNetworkClient::new();

    let connection_result = ironrdp_tokio::connect_finalize(
        upgraded,
        &mut upgraded_framed,
        connector,
        server_name.into(),
        server_public_key,
        Some(&mut network_client),
        None,
    )
    .await
    .context("finalize connection")?;

    Ok((connection_result, upgraded_framed))
}

fn lookup_addr(hostname: &str, port: u16) -> anyhow::Result<std::net::SocketAddr> {
    use std::net::ToSocketAddrs as _;
    let addr = (hostname, port).to_socket_addrs()?.next().unwrap();
    Ok(addr)
}

fn build_config(username: String, password: String, domain: Option<String>) -> connector::Config {
    connector::Config {
        credentials: Credentials::UsernamePassword { username, password },
        domain,
        enable_tls: true,
        enable_credssp: false,
        keyboard_type: KeyboardType::IbmEnhanced,
        keyboard_subtype: 0,
        keyboard_layout: 0,
        keyboard_functional_keys_count: 12,
        ime_file_name: String::new(),
        dig_product_id: String::new(),
        desktop_size: connector::DesktopSize {
            width: 1280,
            height: 1024,
        },
        bitmap: None,
        client_build: 0,
        client_name: "ironrdp".to_owned(),
        client_dir: "C:\\Windows\\System32\\mstscax.dll".to_owned(),

        #[cfg(windows)]
        platform: MajorPlatformType::WINDOWS,
        #[cfg(target_os = "macos")]
        platform: MajorPlatformType::MACINTOSH,
        #[cfg(target_os = "ios")]
        platform: MajorPlatformType::IOS,
        #[cfg(target_os = "linux")]
        platform: MajorPlatformType::UNIX,
        #[cfg(target_os = "android")]
        platform: MajorPlatformType::ANDROID,
        #[cfg(target_os = "freebsd")]
        platform: MajorPlatformType::UNIX,
        #[cfg(target_os = "dragonfly")]
        platform: MajorPlatformType::UNIX,
        #[cfg(target_os = "openbsd")]
        platform: MajorPlatformType::UNIX,
        #[cfg(target_os = "netbsd")]
        platform: MajorPlatformType::UNIX,

        // Disable custom pointers (there is no user interaction anyway)
        no_server_pointer: true,
        autologon: false,
        pointer_software_rendering: true,
        performance_flags: PerformanceFlags::default(),
        desktop_scale_factor: 0,
    }
}

#[cfg_attr(mobile, tauri::mobile_entry_point)]
pub fn run() {
    tauri::Builder::default()
        .manage(RdpState {
            tx: Arc::new(Mutex::new(None)),
        })
        .invoke_handler(tauri::generate_handler![start_rdp_stream, send_mouse_event])
        .run(tauri::generate_context!())
        .expect("error while running tauri application");
}

And I got this error.

error: future cannot be sent between threads safely
   --> src/lib.rs:43:5
    |
43  | /     tokio::spawn(async move {
44  | |         if let Err(e) = run_rdp_session(
45  | |             app_handle, state, hostname, port, username, password, domain,
46  | |         )
...   |
50  | |         }
51  | |     });
    | |______^ future created by async block is not `Send`
    |
    = help: within `{async block@src/lib.rs:43:18: 51:6}`, the trait `Send` is not implemented for `Rc<DecodedPointer>`, which is required by `{async block@src/lib.rs:43:18: 51:6}: Send`
note: future is not `Send` as this value is used across an await
   --> src/lib.rs:80:44
    |
72  |     let mut image = DecodedImage::new(
    |         --------- has type `DecodedImage` which is not `Send`
...
80  |         let mut state_tx = state.tx.lock().await;
    |                                            ^^^^^ await occurs here, with `mut image` maybe used later
note: required by a bound in `tokio::spawn`
   --> /home/hj/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.40.0/src/task/spawn.rs:167:21
    |
165 |     pub fn spawn<F>(future: F) -> JoinHandle<F::Output>
    |            ----- required by a bound in this function
166 |     where
167 |         F: Future + Send + 'static,
    |                     ^^^^ required by this bound in `spawn`

error[E0277]: `dyn AsyncNetworkClient` cannot be sent between threads safely
   --> src/lib.rs:43:5
    |
43  |        tokio::spawn(async move {
    |   _____^____________-
    |  |_____|
    | ||
44  | ||         if let Err(e) = run_rdp_session(
45  | ||             app_handle, state, hostname, port, username, password, domain,
46  | ||         )
...   ||
50  | ||         }
51  | ||     });
    | ||_____-^ `dyn AsyncNetworkClient` cannot be sent between threads safely
    |  |_____|
    |        within this `{async block@src/lib.rs:43:18: 51:6}`
    |
    = help: within `{async block@src/lib.rs:43:18: 51:6}`, the trait `Send` is not implemented for `dyn AsyncNetworkClient`, which is required by `{async block@src/lib.rs:43:18: 51:6}: Send`
    = note: required because it appears within the type `&mut dyn AsyncNetworkClient`
note: required because it appears within the type `Option<&mut dyn AsyncNetworkClient>`
   --> /home/hj/.rustup/toolchains/stable-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/core/src/option.rs:574:10
    |
574 | pub enum Option<T> {
    |          ^^^^^^
note: required because it's used within this `async` fn body
   --> /home/hj/.cargo/git/checkouts/ironrdp-cc592b0b50166368/703b245/crates/ironrdp-async/src/connector.rs:47:1
    |
47  | #[instrument(skip_all)]
    | ^^^^^^^^^^^^^^^^^^^^^^^
note: required because it's used within this `async` fn body
   --> src/lib.rs:216:57
    |
216 |   ) -> anyhow::Result<(ConnectionResult, UpgradedFramed)> {
    |  _________________________________________________________^
217 | |     let server_addr = lookup_addr(&server_name, port).context("lookup addr")?;
218 | |
219 | |     let tcp_stream = TcpStream::connect(server_addr)
...   |
259 | |     Ok((connection_result, upgraded_framed))
260 | | }
    | |_^
note: required because it's used within this `async` fn body
   --> src/lib.rs:64:25
    |
64  |   ) -> Result<(), String> {
    |  _________________________^
65  | |     let config = build_config(username, password, Some(domain));
66  | |
67  | |     let (connection_result, mut framed) = connect(config, hostname, port)
...   |
125 | |     Ok(())
126 | | }
    | |_^
note: required because it's used within this `async` block
   --> src/lib.rs:43:18
    |
43  |       tokio::spawn(async move {
    |  __________________^
44  | |         if let Err(e) = run_rdp_session(
45  | |             app_handle, state, hostname, port, username, password, domain,
46  | |         )
...   |
50  | |         }
51  | |     });
    | |_____^
note: required by a bound in `tokio::spawn`
   --> /home/hj/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.40.0/src/task/spawn.rs:167:21
    |
165 |     pub fn spawn<F>(future: F) -> JoinHandle<F::Output>
    |            ----- required by a bound in this function
166 |     where
167 |         F: Future + Send + 'static,
    |                     ^^^^ required by this bound in `spawn`
    = note: the full name for the type has been written to '/home/hj/multirdp/src-tauri/target/debug/deps/multirdp_lib-4eb0194ff033d779.long-type-150430135109493735.txt'
    = note: consider using `--verbose` to print the full type name to the console
    = note: this error originates in the attribute macro `instrument` (in Nightly builds, run with -Z macro-backtrace for more info)

error[E0277]: `dyn Future<Output = Result<Vec<u8>, ironrdp_error::Error<ConnectorErrorKind>>>` cannot be sent between threads safely
    --> src/lib.rs:43:5
     |
43   | /     tokio::spawn(async move {
44   | |         if let Err(e) = run_rdp_session(
45   | |             app_handle, state, hostname, port, username, password, domain,
46   | |         )
...    |
50   | |         }
51   | |     });
     | |______^ `dyn Future<Output = Result<Vec<u8>, ironrdp_error::Error<ConnectorErrorKind>>>` cannot be sent between threads safely
     |
     = help: the trait `Send` is not implemented for `dyn Future<Output = Result<Vec<u8>, ironrdp_error::Error<ConnectorErrorKind>>>`, which is required by `{async block@src/lib.rs:43:18: 51:6}: Send`
     = note: required for `Unique<dyn Future<Output = Result<Vec<u8>, ironrdp_error::Error<ConnectorErrorKind>>>>` to implement `Send`
note: required because it appears within the type `Box<dyn Future<Output = Result<Vec<u8>, ironrdp_error::Error<ConnectorErrorKind>>>>`
    --> /home/hj/.rustup/toolchains/stable-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/alloc/src/boxed.rs:237:12
     |
237  | pub struct Box<
     |            ^^^
note: required because it appears within the type `Pin<Box<dyn Future<Output = Result<Vec<u8>, ironrdp_error::Error<ConnectorErrorKind>>>>>`
    --> /home/hj/.rustup/toolchains/stable-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/core/src/pin.rs:1090:12
     |
1090 | pub struct Pin<Ptr> {
     |            ^^^
note: required because it's used within this `async` fn body
    --> /home/hj/.cargo/git/checkouts/ironrdp-cc592b0b50166368/703b245/crates/ironrdp-async/src/connector.rs:91:35
     |
91   |   ) -> ConnectorResult<ClientState> {
     |  ___________________________________^
92   | |     let mut state = generator.start();
93   | |
94   | |     loop {
...    |
105  | |     }
106  | | }
     | |_^
note: required because it's used within this `async` block
    --> /home/hj/.cargo/git/checkouts/ironrdp-cc592b0b50166368/703b245/crates/ironrdp-async/src/connector.rs:108:1
     |
108  | #[instrument(level = "trace", skip_all)]
     | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
note: required because it's used within this `async` fn body
    --> /home/hj/.cargo/git/checkouts/ironrdp-cc592b0b50166368/703b245/crates/ironrdp-async/src/connector.rs:108:1
     |
108  | #[instrument(level = "trace", skip_all)]
     | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
note: required because it's used within this `async` block
    --> /home/hj/.cargo/git/checkouts/ironrdp-cc592b0b50166368/703b245/crates/ironrdp-async/src/connector.rs:47:1
     |
47   | #[instrument(skip_all)]
     | ^^^^^^^^^^^^^^^^^^^^^^^
note: required because it's used within this `async` fn body
    --> /home/hj/.cargo/git/checkouts/ironrdp-cc592b0b50166368/703b245/crates/ironrdp-async/src/connector.rs:47:1
     |
47   | #[instrument(skip_all)]
     | ^^^^^^^^^^^^^^^^^^^^^^^
note: required because it's used within this `async` fn body
    --> src/lib.rs:216:57
     |
216  |   ) -> anyhow::Result<(ConnectionResult, UpgradedFramed)> {
     |  _________________________________________________________^
217  | |     let server_addr = lookup_addr(&server_name, port).context("lookup addr")?;
218  | |
219  | |     let tcp_stream = TcpStream::connect(server_addr)
...    |
259  | |     Ok((connection_result, upgraded_framed))
260  | | }
     | |_^
note: required because it's used within this `async` fn body
    --> src/lib.rs:64:25
     |
64   |   ) -> Result<(), String> {
     |  _________________________^
65   | |     let config = build_config(username, password, Some(domain));
66   | |
67   | |     let (connection_result, mut framed) = connect(config, hostname, port)
...    |
125  | |     Ok(())
126  | | }
     | |_^
note: required because it's used within this `async` block
    --> src/lib.rs:43:18
     |
43   |       tokio::spawn(async move {
     |  __________________^
44   | |         if let Err(e) = run_rdp_session(
45   | |             app_handle, state, hostname, port, username, password, domain,
46   | |         )
...    |
50   | |         }
51   | |     });
     | |_____^
note: required by a bound in `tokio::spawn`
    --> /home/hj/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.40.0/src/task/spawn.rs:167:21
     |
165  |     pub fn spawn<F>(future: F) -> JoinHandle<F::Output>
     |            ----- required by a bound in this function
166  |     where
167  |         F: Future + Send + 'static,
     |                     ^^^^ required by this bound in `spawn`

First error is solved when I change ironrdp library from Rc to Arc. but i don't have any idea about how can I solve second error "the trait Send is not implemented for dyn Future<Output = Result<Vec<u8>, ironrdp_error::Error<ConnectorErrorKind>>>"

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions