diff --git a/Cargo.lock b/Cargo.lock index 121700cdd..007fc0da6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1512,6 +1512,7 @@ dependencies = [ "camino", "cfg-if", "ctrlc", + "devolutions-agent-shared", "devolutions-gateway-task", "devolutions-log", "embed-resource", diff --git a/crates/devolutions-agent-shared/src/lib.rs b/crates/devolutions-agent-shared/src/lib.rs index b58f361d0..149c45e25 100644 --- a/crates/devolutions-agent-shared/src/lib.rs +++ b/crates/devolutions-agent-shared/src/lib.rs @@ -45,6 +45,7 @@ pub struct PackageInfoError { pub fn get_installed_agent_version() -> Result, PackageInfoError> { Ok(windows::registry::get_installed_product_version( windows::AGENT_UPDATE_CODE, + windows::registry::ProductVersionEncoding::Agent, )?) } diff --git a/crates/devolutions-agent-shared/src/windows/mod.rs b/crates/devolutions-agent-shared/src/windows/mod.rs index 5b6c2e735..576d7bfed 100644 --- a/crates/devolutions-agent-shared/src/windows/mod.rs +++ b/crates/devolutions-agent-shared/src/windows/mod.rs @@ -17,7 +17,13 @@ pub const GATEWAY_UPDATE_CODE: Uuid = uuid!("{db3903d6-c451-4393-bd80-eb9f45b902 /// /// See [`GATEWAY_UPDATE_CODE`] for more information on update codes. pub const AGENT_UPDATE_CODE: Uuid = uuid!("{82318d3c-811f-4d5d-9a82-b7c31b076755}"); + /// MSI upgrade code for the Devolutions Hub Service. /// /// See [`GATEWAY_UPDATE_CODE`] for more information on update codes. pub const HUB_SERVICE_UPDATE_CODE: Uuid = uuid!("{f437046e-8e13-430a-8c8f-29fcb9023b59}"); + +/// MSI upgrade code for the Remote Desktop Manager. +/// +/// See [`GATEWAY_UPDATE_CODE`] for more information on update codes. +pub const RDM_UPDATE_CODE: Uuid = uuid!("{2707F3BF-4D7B-40C2-882F-14B0ED869EE8}"); diff --git a/crates/devolutions-agent-shared/src/windows/registry.rs b/crates/devolutions-agent-shared/src/windows/registry.rs index 041f3a72d..400c4b283 100644 --- a/crates/devolutions-agent-shared/src/windows/registry.rs +++ b/crates/devolutions-agent-shared/src/windows/registry.rs @@ -48,9 +48,17 @@ pub fn get_product_code(update_code: Uuid) -> Result, RegistryError Ok(Some(reversed_hex_to_uuid(&product_code)?)) } +pub enum ProductVersionEncoding { + Agent, + Rdm, +} + /// Get the installed version of a product using Windows registry. Returns `None` if the product /// is not installed. -pub fn get_installed_product_version(update_code: Uuid) -> Result, RegistryError> { +pub fn get_installed_product_version( + update_code: Uuid, + version_encoding: ProductVersionEncoding, +) -> Result, RegistryError> { let product_code_uuid = match get_product_code(update_code)? { Some(uuid) => uuid, None => return Ok(None), @@ -79,7 +87,14 @@ pub fn get_installed_product_version(update_code: Uuid) -> Result> 24) + 2000; + // The high byte encodes the year as an offset: + // - Agent builds use a base year of 2000 (year = high_byte + 2000). + // - RDM MSI packages use 0x700 (1792) as base, found empirically. + // This offset must be preserved to correctly decode existing RDM installations. + let short_year = match version_encoding { + ProductVersionEncoding::Agent => (product_version >> 24) + 2000, + ProductVersionEncoding::Rdm => (product_version >> 24) + 0x700, + }; let month = (product_version >> 16) & 0xFF; let day = product_version & 0xFFFF; @@ -91,3 +106,36 @@ pub fn get_installed_product_version(update_code: Uuid) -> Result Result, RegistryError> { + let product_code_uuid = match get_product_code(update_code)? { + Some(uuid) => uuid, + None => return Ok(None), + } + .braced(); + + let key_path = format!("{REG_CURRENT_VERSION}\\Uninstall\\{product_code_uuid}"); + + const INSTALL_LOCATION_VALUE_NAME: &str = "InstallLocation"; + + // Now we know the product code of installed MSI, we could read its install location. + let product_tree = windows_registry::LOCAL_MACHINE + .open(&key_path) + .map_err(|source| RegistryError::OpenKey { + key: key_path.clone(), + source, + })?; + + let install_location: String = product_tree + .get_value(INSTALL_LOCATION_VALUE_NAME) + .and_then(TryInto::try_into) + .map_err(|source| RegistryError::ReadValue { + value: INSTALL_LOCATION_VALUE_NAME.to_owned(), + key: key_path.clone(), + source, + })?; + + Ok(Some(install_location)) +} diff --git a/crates/win-api-wrappers/src/event.rs b/crates/win-api-wrappers/src/event.rs index 83cff9b61..eb805cadb 100644 --- a/crates/win-api-wrappers/src/event.rs +++ b/crates/win-api-wrappers/src/event.rs @@ -1,9 +1,12 @@ use std::sync::Arc; -use windows::Win32::Foundation::HANDLE; -use windows::Win32::System::Threading::{CreateEventW, SetEvent}; +use anyhow::bail; +use windows::Win32::Foundation::{HANDLE, WAIT_FAILED, WAIT_OBJECT_0, WAIT_TIMEOUT}; +use windows::Win32::System::Threading::{CreateEventW, INFINITE, SetEvent, WaitForSingleObject}; +use crate::Error; use crate::handle::Handle; +use crate::utils::WideString; /// RAII wrapper for WinAPI event handle. #[derive(Debug, Clone)] @@ -24,6 +27,27 @@ impl Event { }) } + pub fn new_named(name: &str, manual_reset: bool, initial_state: bool) -> anyhow::Result { + let name_wide = WideString::from(name); + + // SAFETY: name_wide is a valid null-terminated UTF-16 string + let raw_handle = unsafe { + CreateEventW( + None, // Default security + manual_reset, // Manual or auto-reset + initial_state, // Initially signaled or not + name_wide.as_pcwstr(), // Event name + ) + }?; + + // SAFETY: `CreateEventW` always returns a valid handle on success. + let handle = unsafe { Handle::new_owned(raw_handle) }?; + + Ok(Self { + handle: Arc::new(handle), + }) + } + pub fn raw(&self) -> HANDLE { self.handle.raw() } @@ -35,4 +59,16 @@ impl Event { } Ok(()) } + + pub fn wait(&self, timeout_ms: Option) -> anyhow::Result<()> { + // SAFETY: No preconditions. + let status = unsafe { WaitForSingleObject(self.handle.raw(), timeout_ms.unwrap_or(INFINITE)) }; + + match status { + WAIT_OBJECT_0 => Ok(()), + WAIT_TIMEOUT => bail!("Timeout waiting for event"), + WAIT_FAILED => bail!(Error::last_error()), + _ => bail!("Unexpected wait result"), + } + } } diff --git a/crates/win-api-wrappers/src/process.rs b/crates/win-api-wrappers/src/process.rs index f65ed82fd..06dc69f5c 100644 --- a/crates/win-api-wrappers/src/process.rs +++ b/crates/win-api-wrappers/src/process.rs @@ -24,10 +24,10 @@ use windows::Win32::System::LibraryLoader::{ use windows::Win32::System::RemoteDesktop::ProcessIdToSessionId; use windows::Win32::System::Threading::{ CREATE_UNICODE_ENVIRONMENT, CreateProcessAsUserW, CreateRemoteThread, EXTENDED_STARTUPINFO_PRESENT, - GetCurrentProcess, GetExitCodeProcess, INFINITE, LPPROC_THREAD_ATTRIBUTE_LIST, LPTHREAD_START_ROUTINE, OpenProcess, - OpenProcessToken, PEB, PROCESS_ACCESS_RIGHTS, PROCESS_BASIC_INFORMATION, PROCESS_CREATION_FLAGS, - PROCESS_INFORMATION, PROCESS_NAME_WIN32, PROCESS_TERMINATE, QueryFullProcessImageNameW, STARTUPINFOEXW, - STARTUPINFOW, STARTUPINFOW_FLAGS, TerminateProcess, WaitForSingleObject, + GetCurrentProcess, GetCurrentProcessId, GetExitCodeProcess, INFINITE, LPPROC_THREAD_ATTRIBUTE_LIST, + LPTHREAD_START_ROUTINE, OpenProcess, OpenProcessToken, PEB, PROCESS_ACCESS_RIGHTS, PROCESS_BASIC_INFORMATION, + PROCESS_CREATION_FLAGS, PROCESS_INFORMATION, PROCESS_NAME_WIN32, PROCESS_TERMINATE, QueryFullProcessImageNameW, + STARTUPINFOEXW, STARTUPINFOW, STARTUPINFOW_FLAGS, TerminateProcess, WaitForSingleObject, }; use windows::Win32::UI::Shell::{SEE_MASK_NOCLOSEPROCESS, SHELLEXECUTEINFOW, ShellExecuteExW}; use windows::Win32::UI::WindowsAndMessaging::{ @@ -902,13 +902,21 @@ fn terminate_process_by_name_impl(process_name: &str, session_id: Option) - Ok(false) } -fn process_id_to_session(pid: u32) -> Result { +/// Get the Windows session ID for a given process ID. +pub fn process_id_to_session(pid: u32) -> Result { let mut session_id = 0; // SAFETY: `session_id` is always pointing to a valid memory location. unsafe { ProcessIdToSessionId(pid, &mut session_id as *mut _) }?; Ok(session_id) } +/// Get the current Windows session ID. +pub fn get_current_session_id() -> Result { + // SAFETY: FFI call with no outstanding preconditions. + let process_id = unsafe { GetCurrentProcessId() }; + process_id_to_session(process_id) +} + struct EnumWindowsContext { expected_pid: u32, threads: Vec, diff --git a/devolutions-agent/src/updater/detect.rs b/devolutions-agent/src/updater/detect.rs index bb1707200..eaab03eaf 100644 --- a/devolutions-agent/src/updater/detect.rs +++ b/devolutions-agent/src/updater/detect.rs @@ -9,10 +9,12 @@ use crate::updater::{Product, UpdaterError}; pub(crate) fn get_installed_product_version(product: Product) -> Result, UpdaterError> { match product { Product::Gateway => { - registry::get_installed_product_version(GATEWAY_UPDATE_CODE).map_err(UpdaterError::WindowsRegistry) + registry::get_installed_product_version(GATEWAY_UPDATE_CODE, registry::ProductVersionEncoding::Agent) + .map_err(UpdaterError::WindowsRegistry) } Product::HubService => { - registry::get_installed_product_version(HUB_SERVICE_UPDATE_CODE).map_err(UpdaterError::WindowsRegistry) + registry::get_installed_product_version(HUB_SERVICE_UPDATE_CODE, registry::ProductVersionEncoding::Agent) + .map_err(UpdaterError::WindowsRegistry) } } } diff --git a/devolutions-session/Cargo.toml b/devolutions-session/Cargo.toml index 1bb1d7170..8e36c6e71 100644 --- a/devolutions-session/Cargo.toml +++ b/devolutions-session/Cargo.toml @@ -47,6 +47,9 @@ optional = true version = "0.4.2" features = ["std"] +[target.'cfg(windows)'.dependencies] +devolutions-agent-shared = { path = "../crates/devolutions-agent-shared" } + [target.'cfg(windows)'.build-dependencies] embed-resource = "3.0" @@ -60,4 +63,4 @@ features = [ "Win32_UI_Shell", "Win32_System_Console", "Win32_UI_Input_KeyboardAndMouse", -] +] \ No newline at end of file diff --git a/devolutions-session/src/dvc/io.rs b/devolutions-session/src/dvc/io.rs index e7ee8aa44..87b871223 100644 --- a/devolutions-session/src/dvc/io.rs +++ b/devolutions-session/src/dvc/io.rs @@ -9,7 +9,7 @@ use win_api_wrappers::wts::WtsVirtualChannel; use windows::Win32::Foundation::{ERROR_IO_PENDING, GetLastError, WAIT_EVENT, WAIT_OBJECT_0}; use windows::Win32::Storage::FileSystem::{ReadFile, WriteFile}; use windows::Win32::System::IO::{GetOverlappedResult, OVERLAPPED}; -use windows::Win32::System::RemoteDesktop::{CHANNEL_CHUNK_LENGTH, CHANNEL_PDU_HEADER}; +use windows::Win32::System::RemoteDesktop::CHANNEL_PDU_HEADER; use windows::Win32::System::Threading::{INFINITE, WaitForMultipleObjects}; use crate::dvc::channel::WinapiSignaledReceiver; @@ -31,7 +31,9 @@ pub fn run_dvc_io( trace!("DVC channel opened"); - let mut pdu_chunk_buffer = [0u8; CHANNEL_CHUNK_LENGTH as usize]; + // All DVC messages should be under CHANNEL_CHUNK_LENGTH size, but sometimes RDP stack + // sends a few messages together; 128Kb buffer should be enough to hold a few dozen messages. + let mut pdu_chunk_buffer = [0u8; 128 * 1024]; let mut overlapped = OVERLAPPED::default(); let mut bytes_read: u32 = 0; @@ -111,7 +113,6 @@ pub fn run_dvc_io( } } } - // Prepare async read file operation one more time. // SAFETY: No preconditions. let result = diff --git a/devolutions-session/src/dvc/mod.rs b/devolutions-session/src/dvc/mod.rs index d0fc90282..5e3cd4fe7 100644 --- a/devolutions-session/src/dvc/mod.rs +++ b/devolutions-session/src/dvc/mod.rs @@ -38,6 +38,7 @@ pub mod fs; pub mod io; pub mod now_message_dissector; pub mod process; +pub mod rdm; pub mod task; mod env; diff --git a/devolutions-session/src/dvc/rdm.rs b/devolutions-session/src/dvc/rdm.rs new file mode 100644 index 000000000..be96e88ee --- /dev/null +++ b/devolutions-session/src/dvc/rdm.rs @@ -0,0 +1,763 @@ +use std::mem::size_of; +use std::sync::Arc; +use std::sync::atomic::{AtomicU8, Ordering}; +use std::time::{SystemTime, UNIX_EPOCH}; + +use anyhow::{Context, bail}; +use devolutions_agent_shared::windows::RDM_UPDATE_CODE; +use devolutions_agent_shared::windows::registry::{ + ProductVersionEncoding, get_install_location, get_installed_product_version, +}; +use now_proto_pdu::ironrdp_core::{Encode, WriteCursor}; +use now_proto_pdu::{ + NowChannelCapsetMsg, NowMessage, NowProtoVersion, NowRdmAppNotifyMsg, NowRdmAppStartMsg, NowRdmAppState, + NowRdmCapabilitiesMsg, NowRdmMessage, NowRdmReason, +}; +use tokio::io::{AsyncReadExt, AsyncWriteExt}; +use tokio::net::windows::named_pipe::{ClientOptions, NamedPipeClient}; +use tracing::{error, info, trace, warn}; +use win_api_wrappers::event::Event; +use win_api_wrappers::handle::Handle; +use win_api_wrappers::process::{Process, ProcessEntry32Iterator, get_current_session_id}; +use win_api_wrappers::utils::WideString; +use windows::Win32::Foundation::ERROR_ALREADY_EXISTS; +use windows::Win32::System::Threading::{ + CREATE_NEW_CONSOLE, CREATE_NEW_PROCESS_GROUP, CreateProcessW, NORMAL_PRIORITY_CLASS, PROCESS_INFORMATION, + PROCESS_QUERY_INFORMATION, STARTUPINFOW, +}; +use windows::Win32::UI::WindowsAndMessaging::{SW_MAXIMIZE, SW_RESTORE}; +use windows::core::{PCWSTR, PWSTR}; + +use crate::dvc::channel::WinapiSignaledSender; +use crate::dvc::now_message_dissector::NowMessageDissector; + +const PIPE_READ_BUFFER_SIZE: usize = 4096; + +/// Generate session-specific RDM agent pipe name. +/// +/// Format: `\\.\pipe\devolutions-session-{session_id}` +fn get_rdm_pipe_name() -> anyhow::Result { + let session_id = get_current_session_id().context("Failed to get current session ID")?; + Ok(format!(r"\\.\pipe\devolutions-session-{}", session_id)) +} + +/// Generate session-specific RDM ready event name. +/// +/// Format: `Global\devolutions-session-{session_id}-ready` +/// This event is created by devolutions-session and signaled by RDM when ready. +fn get_rdm_ready_event_name() -> anyhow::Result { + let session_id = get_current_session_id().context("Failed to get current session ID")?; + Ok(format!(r"Global\devolutions-session-{}-ready", session_id)) +} + +/// Create or open the RDM ready event +/// +/// Creates a named event that RDM will signal when its pipe server is ready. +/// If RDM already created the event (it was launched before us), we open it instead. +fn create_or_open_rdm_ready_event() -> anyhow::Result { + let event_name = get_rdm_ready_event_name()?; + + info!(event_name, "Creating or opening RDM ready event"); + + // Check if event already existed after creation + // Use auto-reset event (false) so it automatically resets after one waiter is released + let event = Event::new_named(&event_name, false, false)?; + #[allow(clippy::cast_possible_wrap)] + let already_exists = win_api_wrappers::Error::last_error().code() == ERROR_ALREADY_EXISTS.0 as i32; + + if already_exists { + info!(event_name, "RDM ready event already exists (RDM was launched first)"); + } else { + info!(event_name, "Created new RDM ready event"); + } + + Ok(event) +} + +/// RDM named pipe connection for message passthrough +pub struct RdmPipeConnection { + pipe: NamedPipeClient, + pipe_name: String, + dissector: NowMessageDissector, +} + +impl RdmPipeConnection { + /// Connect to RDM named pipe after waiting for ready event + /// + /// Waits for RDM ready event to be signaled, then connects to the pipe. + /// The ready event is dropped after connection, allowing RDM to own it. + async fn connect(timeout_secs: u32, ready_event: Event) -> anyhow::Result { + let pipe_name = get_rdm_pipe_name()?; + let timeout_ms = timeout_secs.saturating_mul(1000); + + info!(pipe_name, timeout_secs, "Waiting for RDM and connecting to pipe"); + + // Wait for RDM to signal it's ready (pipe server is listening) + // Transfer ownership to spawn_blocking - event dropped after wait completes + tokio::task::spawn_blocking(move || -> anyhow::Result<()> { + ready_event.wait(Some(timeout_ms))?; + info!("RDM ready event signaled successfully"); + Ok(()) + }) + .await + .context("Task join error")? + .context("Failed to wait for RDM ready event")?; + + trace!("RDM ready event signaled, attempting to connect to pipe"); + + // Retry connection with exponential backoff + // RDM event was signaled, but pipe server might need a moment to accept connections + const MAX_ATTEMPTS: usize = 10; + const INITIAL_DELAY_MS: u64 = 50; + + let mut delay_ms = INITIAL_DELAY_MS; + let mut attempt = 0; + + let pipe_error = loop { + match ClientOptions::new().open(&pipe_name) { + Ok(pipe_client) => { + info!(pipe_name, attempt, "Successfully connected to RDM pipe"); + return Ok(Self { + pipe: pipe_client, + pipe_name, + dissector: NowMessageDissector::default(), + }); + } + Err(error) => { + tokio::time::sleep(tokio::time::Duration::from_millis(delay_ms)).await; + delay_ms = (delay_ms * 2).min(500); // Cap at 500ms + attempt += 1; + if attempt >= MAX_ATTEMPTS { + break error; + } + } + } + }; + Err(pipe_error).context("RDM host pipe is unresponsive") + } + + async fn send_message(&mut self, message: &NowMessage<'_>) -> anyhow::Result<()> { + let size = message.size(); + let mut buffer = vec![0u8; size]; + + { + let mut cursor = WriteCursor::new(&mut buffer); + message.encode(&mut cursor).context("Failed to encode message")?; + } + + self.pipe + .write_all(&buffer) + .await + .context("Failed to send message to RDM pipe")?; + + Ok(()) + } + + async fn read_messages(&mut self) -> anyhow::Result>> { + let mut buffer = vec![0u8; PIPE_READ_BUFFER_SIZE]; + + loop { + let bytes_read = self + .pipe + .read(&mut buffer) + .await + .context("Failed to read from RDM pipe")?; + + if bytes_read == 0 { + bail!("RDM pipe closed"); + } + + let messages = self + .dissector + .dissect(&buffer[..bytes_read]) + .context("Failed to dissect message from RDM")?; + + if !messages.is_empty() { + trace!(count = messages.len(), "Read messages from RDM pipe"); + return Ok(messages); + } + + // Need more data, continue reading + } + } + + fn pipe_name(&self) -> &str { + &self.pipe_name + } +} + +fn validate_capset_response(message: NowMessage<'_>) -> anyhow::Result { + match message { + NowMessage::Channel(now_proto_pdu::NowChannelMessage::Capset(caps)) => { + if caps.version().major != NowProtoVersion::CURRENT.major { + bail!( + "Incompatible protocol version: expected major version {}, got {}.{}", + NowProtoVersion::CURRENT.major, + caps.version().major, + caps.version().minor + ); + } + + Ok(caps) + } + _ => { + bail!("Expected capset message, got: {:?}", message); + } + } +} + +/// Perform NOW protocol negotiation with RDM +/// +/// Sends the agent's proposed capabilities to RDM and receives RDM's negotiated +/// capabilities for the connection (a final set that may be a downgraded subset +/// of the agent's proposal and that both sides will use). +/// This establishes the protocol version and capabilities for the connection. +async fn negotiate_with_rdm( + pipe: &mut RdmPipeConnection, + agent_caps: &NowChannelCapsetMsg, +) -> anyhow::Result { + info!(pipe_name = %pipe.pipe_name(), "Starting NOW protocol negotiation with RDM"); + + let caps_msg: NowMessage<'_> = agent_caps.clone().into(); + pipe.send_message(&caps_msg) + .await + .context("Failed to send capabilities to RDM")?; + + info!("Sent agent capabilities to RDM, waiting for response"); + + let messages = pipe + .read_messages() + .await + .context("Failed to read RDM capabilities response")?; + + let rdm_caps_msg = messages + .into_iter() + .next() + .context("No capset response received from RDM")?; + + let rdm_caps = validate_capset_response(rdm_caps_msg).context("Invalid capset response from RDM")?; + + info!( + rdm_version = ?rdm_caps.version(), + "Negotiation successful with RDM" + ); + + Ok(rdm_caps) +} + +/// Bidirectional message passthrough between RDM pipe and DVC channel +/// for RDM messages. +/// +/// Handles both directions: +/// - RDM→DVC: Reads from RDM pipe, forwards to NowAgent DVC channel +/// - DVC→RDM: Receives from NowAgent DVC channel, writes to RDM pipe. +/// +/// Uses tokio::select! to multiplex between reading from pipe and receiving from channel. +/// Intercepts AppNotify messages to track connection state. +async fn run_rdm_to_dvc_passthrough( + mut pipe: RdmPipeConnection, + mut dvc_rx: tokio::sync::mpsc::Receiver>, + dvc_tx: WinapiSignaledSender>, + connection_state: Arc, +) -> anyhow::Result<()> { + let pipe_name = pipe.pipe_name().to_owned(); + info!(pipe_name, "Starting bidirectional RDM pipe passthrough"); + + loop { + tokio::select! { + // Read messages from RDM pipe and forward to DVC + result = pipe.read_messages() => { + let messages = match result { + Ok(msgs) => msgs, + Err(error) => { + error!(%error, pipe_name, "Failed to read message from RDM"); + connection_state.store(RdmConnectionState::Disconnected.as_u8(), Ordering::Release); + break; + } + }; + // Process all messages from this read + for message in messages { + info!(pipe_name, "Received message from RDM: {:?}", message); + + // Intercept AppNotify to track state + if let NowMessage::Rdm(NowRdmMessage::AppNotify(ref notify)) = message { + let app_state = notify.app_state(); + info!(?app_state, "Intercepted AppNotify from RDM"); + + let new_state = match app_state { + NowRdmAppState::READY => Some(RdmConnectionState::Ready), + NowRdmAppState::FAILED => Some(RdmConnectionState::Disconnected), + _ => None, + }; + + if let Some(new_state) = new_state { + // Update state atomically + connection_state.store(new_state.as_u8(), Ordering::Release); + info!(pipe_name, ?new_state, "Updated RDM connection state"); + } + } + + // Forward message to DVC + if let Err(error) = dvc_tx.send(message).await { + error!(%error, pipe_name, "Failed to send message to DVC"); + return Ok(()); + } + } + } + + // Receive messages from channel and write to RDM pipe + message_opt = dvc_rx.recv() => { + match message_opt { + Some(message) => { + info!(pipe_name, "Forwarding message to RDM: {:?}", message); + if let Err(error) = pipe.send_message(&message).await { + error!(%error, pipe_name, "Failed to send message to RDM pipe"); + connection_state.store(RdmConnectionState::Disconnected.as_u8(), Ordering::Release); + break; + } + } + None => { + info!(pipe_name, "DVC receiver channel closed; terminating RDM passthrough task"); + break; + } + } + } + } + } + + info!(pipe_name, "Bidirectional RDM passthrough task terminated"); + + Ok(()) +} + +/// RDM connection state +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +#[repr(u8)] +enum RdmConnectionState { + /// Not connected to RDM + Disconnected = 0, + /// Connection in progress + Connecting = 1, + /// Connected and ready (received READY notification from RDM) + Ready = 2, +} + +impl RdmConnectionState { + fn from_u8(value: u8) -> Self { + match value { + 0 => Self::Disconnected, + 1 => Self::Connecting, + 2 => Self::Ready, + _ => Self::Disconnected, // Default to disconnected for invalid values + } + } + + fn as_u8(self) -> u8 { + self as u8 + } +} + +/// Manages DVC <-> RDM pipe communication/state and message routing. +pub struct RdmMessageProcessor { + dvc_tx: WinapiSignaledSender>, + rdm_tx: Option>>, + connection_state: Arc, +} + +impl RdmMessageProcessor { + /// Create a new RDM handler + pub fn new(dvc_tx: WinapiSignaledSender>) -> Self { + Self { + dvc_tx, + rdm_tx: None, + connection_state: Arc::new(AtomicU8::new(RdmConnectionState::Disconnected.as_u8())), + } + } + + /// Process RDM capabilities message + /// + /// This is the only RDM message handled by the agent and not passed to DVC. + /// It checks if RDM is installed and returns version information along with + /// timestamp synchronization. + pub async fn process_capabilities( + &self, + rdm_caps_msg: NowRdmCapabilitiesMsg<'_>, + ) -> anyhow::Result> { + let client_timestamp = rdm_caps_msg.timestamp(); + let server_timestamp = SystemTime::now() + .duration_since(UNIX_EPOCH) + .context("Failed to get current timestamp")? + .as_secs(); + + info!( + client_timestamp, + server_timestamp, "Processing RDM capabilities message" + ); + + let (is_rdm_available, rdm_version) = { + match get_installed_product_version(RDM_UPDATE_CODE, ProductVersionEncoding::Rdm) { + Ok(Some(date_version)) => { + info!(version = %date_version, "RDM installation found via MSI registry"); + (true, date_version.to_string()) + } + Ok(None) => { + info!("RDM not found in MSI registry"); + (false, String::new()) + } + Err(error) => { + warn!(%error, "Failed to check RDM via MSI registry"); + (false, String::new()) + } + } + }; + + // Create response message with server timestamp + let mut response = NowRdmCapabilitiesMsg::new(server_timestamp, rdm_version) + .context("Failed to create RDM capabilities response")?; + + if is_rdm_available { + response = response.with_app_available(); + info!("RDM application is available on system"); + } else { + info!("RDM application is not available"); + } + + Ok(NowMessage::Rdm(NowRdmMessage::Capabilities(response))) + } + + /// Process RDM app start message: + /// - If RDM is already connected and ready, send immediate READY notification. + /// - If RDM connection is in progress, ignore duplicate app start. + /// - If RDM is not started, launch RDM and start connection process. + /// - Spawns a background task to handle the connection process. + pub fn process_app_start(&mut self, rdm_app_start_msg: NowRdmAppStartMsg, agent_caps: NowChannelCapsetMsg) { + let mut current_state = RdmConnectionState::from_u8(self.connection_state.load(Ordering::Acquire)); + + // Ensure that the transition from Disconnected to Connecting is done atomically + // so that only one task is spawned to handle app_start. + loop { + match current_state { + RdmConnectionState::Ready => { + info!("RDM already connected and ready, sending immediate READY notification"); + let dvc_tx = self.dvc_tx.clone(); + tokio::spawn(async move { + if let Err(error) = + send_rdm_app_notify(&dvc_tx, NowRdmAppState::READY, NowRdmReason::NOT_SPECIFIED).await + { + error!(%error, "Failed to send immediate RDM READY notification"); + } + }); + return; + } + RdmConnectionState::Connecting => { + info!("RDM connection already in progress, ignoring duplicate app_start"); + return; + } + RdmConnectionState::Disconnected => { + info!("Starting RDM connection process"); + let disconnected = RdmConnectionState::Disconnected.as_u8(); + let connecting = RdmConnectionState::Connecting.as_u8(); + + match self.connection_state.compare_exchange( + disconnected, + connecting, + Ordering::AcqRel, + Ordering::Acquire, + ) { + Ok(_) => { + // Successfully claimed responsibility for starting the connection. + break; + } + Err(actual) => { + // State changed concurrently; re-evaluate the new state. + current_state = RdmConnectionState::from_u8(actual); + continue; + } + } + } + } + } + + let dvc_tx = self.dvc_tx.clone(); + let connection_state = Arc::clone(&self.connection_state); + + // Use bounded channel to prevent unbounded memory growth (capacity: 100 messages) + let (rdm_tx, rdm_rx) = tokio::sync::mpsc::channel(100); + self.rdm_tx = Some(rdm_tx); + + tokio::spawn(async move { + if let Err(error) = process_app_start_impl( + rdm_app_start_msg, + agent_caps, + dvc_tx, + Arc::clone(&connection_state), + rdm_rx, + ) + .await + { + error!(%error, "RDM app start failed"); + // Ensure connection_state is reset so future app_start attempts are possible + connection_state.store(RdmConnectionState::Disconnected.as_u8(), Ordering::Release); + } + }); + } + + /// Forward RDM message to RDM via pipe + pub async fn forward_message(&mut self, message: NowRdmMessage<'static>) -> anyhow::Result<()> { + let current_state = RdmConnectionState::from_u8(self.connection_state.load(Ordering::Acquire)); + + match current_state { + RdmConnectionState::Ready => { + if let Some(rdm_tx) = &self.rdm_tx { + let now_msg: NowMessage<'static> = NowMessage::Rdm(message); + rdm_tx + .send(now_msg) + .await + .context("Failed to send message to RDM channel")?; + Ok(()) + } else { + warn!("RDM state is Ready but channel is not available"); + bail!("RDM channel not available"); + } + } + RdmConnectionState::Connecting => { + warn!("Cannot forward message: RDM connection is still in progress"); + bail!("RDM connection in progress"); + } + RdmConnectionState::Disconnected => { + warn!("Cannot forward message: RDM is not connected"); + bail!("RDM not connected"); + } + } + } +} + +/// Implementation of RDM app start logic (runs in background task) +async fn process_app_start_impl( + rdm_app_start_msg: NowRdmAppStartMsg, + agent_caps: NowChannelCapsetMsg, + dvc_tx: WinapiSignaledSender>, + connection_state: Arc, + rdm_rx: tokio::sync::mpsc::Receiver>, +) -> anyhow::Result<()> { + info!("Processing RDM app start message"); + + // Create or open the ready event that RDM will signal + let ready_event = create_or_open_rdm_ready_event().context("Failed to create/open RDM ready event")?; + + // Launch RDM if not already running + if !is_rdm_running() { + info!("RDM is not running, launching..."); + match launch_rdm_process(&rdm_app_start_msg).await { + Ok(process_id) => { + info!( + "RDM application started successfully with PID: {} (detached)", + process_id + ); + } + Err(error) => { + error!(%error, "Failed to launch RDM application"); + send_rdm_app_notify(&dvc_tx, NowRdmAppState::FAILED, NowRdmReason::STARTUP_FAILURE).await?; + return Err(error); + } + } + } else { + info!("RDM application is already running"); + } + + // Connect to RDM pipe with timeout + let mut pipe = match RdmPipeConnection::connect(rdm_app_start_msg.timeout(), ready_event).await { + Ok(pipe) => { + info!("Connected to RDM pipe successfully"); + pipe + } + Err(error) => { + error!(%error, "Failed to connect to RDM pipe"); + send_rdm_app_notify(&dvc_tx, NowRdmAppState::FAILED, NowRdmReason::LAUNCH_TIMEOUT).await?; + return Err(error).context("Failed to connect to RDM pipe"); + } + }; + + // Perform negotiation + match negotiate_with_rdm(&mut pipe, &agent_caps).await { + Ok(_rdm_caps) => { + info!("Negotiation with RDM successful"); + + // Passthrough original app start message to RDM. + let app_start_msg: NowMessage<'_> = NowMessage::Rdm(NowRdmMessage::AppStart(rdm_app_start_msg)); + pipe.send_message(&app_start_msg) + .await + .context("Failed to send app start message to RDM")?; + + trace!("Sent app start message to RDM"); + + // Start passthrough task with pipe and channel receiver + // RDM will send READY notification after negotiation completes + start_passthrough_task(pipe, rdm_rx, dvc_tx, connection_state).await?; + + Ok(()) + } + Err(error) => { + error!(%error, "Failed to negotiate with RDM"); + send_rdm_app_notify(&dvc_tx, NowRdmAppState::FAILED, NowRdmReason::STARTUP_FAILURE).await?; + Err(error).context("Failed to negotiate with RDM") + } + } +} + +/// Start the passthrough task to forward messages bidirectionally +async fn start_passthrough_task( + pipe: RdmPipeConnection, + rdm_rx: tokio::sync::mpsc::Receiver>, + dvc_tx: WinapiSignaledSender>, + connection_state: Arc, +) -> anyhow::Result<()> { + tokio::spawn(async move { + if let Err(error) = run_rdm_to_dvc_passthrough(pipe, rdm_rx, dvc_tx, connection_state).await { + error!(%error, "RDM passthrough task failed"); + } + }); + + Ok(()) +} + +/// Launch RDM process with specified options (detached) +async fn launch_rdm_process(rdm_app_start_msg: &NowRdmAppStartMsg) -> anyhow::Result { + let rdm_exe_path = get_rdm_executable_path().context("RDM is not installed")?; + + let install_location = rdm_exe_path + .parent() + .context("Failed to get RDM installation directory")? + .to_string_lossy() + .to_string(); + + // Convert command line to wide string + let current_dir = WideString::from(&install_location); + + info!( + exe_path = %rdm_exe_path.display(), + fullscreen = rdm_app_start_msg.is_fullscreen(), + maximized = rdm_app_start_msg.is_maximized(), + jump_mode = rdm_app_start_msg.is_jump_mode(), + "Starting RDM application with CreateProcess" + ); + + // Create process using CreateProcessW + #[allow(clippy::cast_possible_truncation)] // STARTUPINFOW.cb is u32 by design + let startup_info = STARTUPINFOW { + cb: size_of::() as u32, + #[allow(clippy::cast_possible_truncation)] // wShowWindow is u16 by design + wShowWindow: if rdm_app_start_msg.is_maximized() { + SW_MAXIMIZE.0 as u16 + } else { + SW_RESTORE.0 as u16 + }, + dwFlags: windows::Win32::System::Threading::STARTF_USESHOWWINDOW, + ..Default::default() + }; + + let mut process_info = PROCESS_INFORMATION::default(); + + // Create a mutable copy of the command line for CreateProcessW + let mut command_line_buffer: Vec = format!("\"{}\"", rdm_exe_path.display()) + .encode_utf16() + .chain(std::iter::once(0)) + .collect(); + + // SAFETY: All pointers are valid and properly initialized + let success = unsafe { + CreateProcessW( + None, + Some(PWSTR(command_line_buffer.as_mut_ptr())), + None, + None, + false, + NORMAL_PRIORITY_CLASS | CREATE_NEW_PROCESS_GROUP | CREATE_NEW_CONSOLE, + None, + PCWSTR(current_dir.as_pcwstr().as_ptr()), + &startup_info, + &mut process_info, + ) + }; + + if success.is_err() || process_info.hProcess.is_invalid() { + return Err(win_api_wrappers::Error::last_error().into()); + } + + // Close handles as we're launching detached (no need to wait) + // SAFETY: It is safe to create owned handle wrapper from created process thread handle + let _ = unsafe { Handle::new(process_info.hThread, true) }; + // SAFETY: It is safe to create owned handle wrapper from created process handle + let _ = unsafe { Handle::new(process_info.hProcess, true) }; + + Ok(process_info.dwProcessId) +} + +/// Check if RDM is already running +fn is_rdm_running() -> bool { + let rdm_exe_path = match get_rdm_executable_path() { + Some(path) => path, + None => return false, + }; + + let process_iterator = match ProcessEntry32Iterator::new() { + Ok(iter) => iter, + Err(error) => { + warn!(%error, "Failed to create process iterator for RDM detection"); + return false; + } + }; + + for process_entry in process_iterator { + let pid = process_entry.process_id(); + + let process = match Process::get_by_pid(pid, PROCESS_QUERY_INFORMATION) { + Ok(proc) => proc, + Err(_) => continue, + }; + + let exe_path = match process.exe_path() { + Ok(path) => path, + Err(_) => continue, + }; + + // Compare the full paths case-insensitively + if exe_path + .to_string_lossy() + .eq_ignore_ascii_case(&rdm_exe_path.to_string_lossy()) + { + info!( + rdm_path = %rdm_exe_path.display(), + found_path = %exe_path.display(), + "Found already running RDM process" + ); + return true; + } + } + false +} + +/// Get the RDM executable path from installation location +fn get_rdm_executable_path() -> Option { + match get_install_location(RDM_UPDATE_CODE) { + Ok(Some(install_location)) => { + let rdm_exe_path = std::path::Path::new(&install_location).join("RemoteDesktopManager.exe"); + Some(rdm_exe_path) + } + Ok(None) => None, + Err(_) => None, + } +} + +/// Send RDM app notification message +async fn send_rdm_app_notify( + dvc_tx: &WinapiSignaledSender>, + state: NowRdmAppState, + reason: NowRdmReason, +) -> anyhow::Result<()> { + info!(?state, ?reason, "Sending RDM app state notification"); + + let message = NowRdmAppNotifyMsg::new(state, reason); + dvc_tx.send(NowMessage::Rdm(NowRdmMessage::AppNotify(message))).await?; + Ok(()) +} diff --git a/devolutions-session/src/dvc/task.rs b/devolutions-session/src/dvc/task.rs index ec8e890e3..0ac38d1d2 100644 --- a/devolutions-session/src/dvc/task.rs +++ b/devolutions-session/src/dvc/task.rs @@ -1,5 +1,4 @@ use std::collections::HashMap; -use std::time::{SystemTime, UNIX_EPOCH}; use anyhow::{Context, bail}; use async_trait::async_trait; @@ -9,14 +8,15 @@ use now_proto_pdu::{ ComApartmentStateKind, NowChannelCapsetMsg, NowChannelCloseMsg, NowChannelHeartbeatMsg, NowChannelMessage, NowExecBatchMsg, NowExecCancelRspMsg, NowExecCapsetFlags, NowExecDataMsg, NowExecDataStreamKind, NowExecMessage, NowExecProcessMsg, NowExecPwshMsg, NowExecResultMsg, NowExecRunMsg, NowExecStartedMsg, NowExecWinPsMsg, NowMessage, - NowMsgBoxResponse, NowProtoError, NowProtoVersion, NowRdmCapabilitiesMsg, NowRdmMessage, NowSessionCapsetFlags, - NowSessionMessage, NowSessionMsgBoxReqMsg, NowSessionMsgBoxRspMsg, NowStatusError, NowSystemCapsetFlags, - NowSystemMessage, SetKbdLayoutOption, + NowMsgBoxResponse, NowProtoError, NowProtoVersion, NowRdmMessage, NowSessionCapsetFlags, NowSessionMessage, + NowSessionMsgBoxReqMsg, NowSessionMsgBoxRspMsg, NowStatusError, NowSystemCapsetFlags, NowSystemMessage, + SetKbdLayoutOption, }; use tokio::select; use tokio::sync::mpsc::{self, Receiver, Sender}; use tracing::{error, info, warn}; use win_api_wrappers::event::Event; +use win_api_wrappers::process::Process; use win_api_wrappers::security::privilege::ScopedPrivileges; use win_api_wrappers::utils::WideString; use windows::Win32::Foundation::{HWND, LPARAM, WPARAM}; @@ -38,6 +38,7 @@ use crate::dvc::channel::{WinapiSignaledSender, bounded_mpsc_channel, winapi_sig use crate::dvc::fs::TmpFileGuard; use crate::dvc::io::run_dvc_io; use crate::dvc::process::{ExecError, ServerChannelEvent, WinApiProcess, WinApiProcessBuilder}; +use crate::dvc::rdm::RdmMessageProcessor; // One minute heartbeat interval by default const DEFAULT_HEARTBEAT_INTERVAL: core::time::Duration = core::time::Duration::from_secs(60); @@ -289,6 +290,7 @@ struct MessageProcessor { #[allow(dead_code)] // Not yet used. capabilities: NowChannelCapsetMsg, sessions: HashMap, + rdm_handler: RdmMessageProcessor, } impl MessageProcessor { @@ -297,11 +299,13 @@ impl MessageProcessor { dvc_tx: WinapiSignaledSender>, io_notification_tx: Sender, ) -> Self { + let rdm_handler = RdmMessageProcessor::new(dvc_tx.clone()); Self { dvc_tx, io_notification_tx, capabilities, sessions: HashMap::new(), + rdm_handler, } } @@ -331,20 +335,6 @@ impl MessageProcessor { message: NowMessage<'static>, ) -> anyhow::Result { match message { - NowMessage::Rdm(NowRdmMessage::Capabilities(_)) => { - // Send empty capabilities (as RDM app is not available on the server side). - - let server_timestamp = SystemTime::now() - .duration_since(UNIX_EPOCH) - .context("Failed to get current timestamp")? - .as_secs(); - - let rdm_caps = NowRdmCapabilitiesMsg::new(server_timestamp, String::new())?; - - self.dvc_tx - .send(NowMessage::Rdm(NowRdmMessage::Capabilities(rdm_caps))) - .await?; - } NowMessage::Channel(NowChannelMessage::Capset(client_caps)) => { return Ok(ProcessMessageAction::Restart(client_caps)); } @@ -481,8 +471,8 @@ impl MessageProcessor { } } NowMessage::System(NowSystemMessage::Shutdown(shutdown_msg)) => { - let mut current_process_token = win_api_wrappers::process::Process::current_process() - .token(TOKEN_ADJUST_PRIVILEGES | TOKEN_QUERY)?; + let mut current_process_token = + Process::current_process().token(TOKEN_ADJUST_PRIVILEGES | TOKEN_QUERY)?; let mut _priv_tcb = ScopedPrivileges::enter( &mut current_process_token, &[win_api_wrappers::security::privilege::SE_SHUTDOWN_NAME], @@ -527,6 +517,33 @@ impl MessageProcessor { // TODO: Adjust `NowSession` token privileges in NowAgent to make shutdown possible // from this process. } + NowMessage::Rdm(NowRdmMessage::Capabilities(rdm_caps_msg)) => { + match self.rdm_handler.process_capabilities(rdm_caps_msg).await { + Ok(response_msg) => { + self.dvc_tx.send(response_msg).await?; + } + Err(error) => { + error!(%error, "Failed to process RDM capabilities message"); + } + } + } + NowMessage::Rdm(NowRdmMessage::AppStart(rdm_app_start_msg)) => { + // Start RDM in background task (non-blocking) - needs capabilities + self.rdm_handler + .process_app_start(rdm_app_start_msg, self.capabilities.clone()); + info!("RDM application start initiated in background"); + } + NowMessage::Rdm(other_rdm_msg) => { + // Forward all other RDM messages (including AppAction) to RDM via pipe + match self.rdm_handler.forward_message(other_rdm_msg).await { + Ok(()) => { + info!("RDM message forwarded to pipe successfully"); + } + Err(error) => { + error!(%error, "Failed to forward RDM message to pipe"); + } + } + } _ => { warn!("Unsupported message: {:?}", message); } diff --git a/devolutions-session/src/main.rs b/devolutions-session/src/main.rs index fad2ba5f4..a7b0056ba 100644 --- a/devolutions-session/src/main.rs +++ b/devolutions-session/src/main.rs @@ -3,7 +3,10 @@ #![cfg_attr(windows, windows_subsystem = "windows")] #[cfg(all(windows, feature = "dvc"))] -use ::{async_trait as _, now_proto_pdu as _, tempfile as _, thiserror as _, win_api_wrappers as _, windows as _}; +use ::{ + async_trait as _, devolutions_agent_shared as _, now_proto_pdu as _, tempfile as _, thiserror as _, + win_api_wrappers as _, windows as _, +}; use ::{ camino as _, cfg_if as _, ctrlc as _, devolutions_log as _, futures as _, parking_lot as _, serde as _, serde_json as _, tap as _,