refactor: migrate backoff crate to backon (#6718)

Replace backoff 0.4.0 with backon 1.6.0 for retry logic.
This commit is contained in:
Tunglies 2026-04-03 21:21:04 +08:00 committed by GitHub
parent 5da9f99698
commit 830c0773dc
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 78 additions and 92 deletions

29
Cargo.lock generated
View File

@ -559,16 +559,13 @@ dependencies = [
] ]
[[package]] [[package]]
name = "backoff" name = "backon"
version = "0.4.0" version = "1.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b62ddb9cb1ec0a098ad4bbf9344d0713fa193ae1a80af55febcff2627b6a00c1" checksum = "cffb0e931875b666fc4fcb20fee52e9bbd1ef836fd9e9e04ec21555f9f85f7ef"
dependencies = [ dependencies = [
"futures-core", "fastrand 2.3.0",
"getrandom 0.2.17", "gloo-timers",
"instant",
"pin-project-lite",
"rand 0.8.5",
"tokio", "tokio",
] ]
@ -1117,7 +1114,7 @@ dependencies = [
"anyhow", "anyhow",
"arc-swap", "arc-swap",
"async-trait", "async-trait",
"backoff", "backon",
"base64 0.22.1", "base64 0.22.1",
"bitflags 2.11.0", "bitflags 2.11.0",
"boa_engine", "boa_engine",
@ -1243,7 +1240,7 @@ dependencies = [
[[package]] [[package]]
name = "clash_verge_service_ipc" name = "clash_verge_service_ipc"
version = "2.2.0" version = "2.2.0"
source = "git+https://github.com/clash-verge-rev/clash-verge-service-ipc#b73568a9ecc9e62577e9ce81a123b554f06a9fb3" source = "git+https://github.com/clash-verge-rev/clash-verge-service-ipc#62e0fe76279350303373e13cbdb6af32a04abe0f"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"compact_str", "compact_str",
@ -3039,6 +3036,18 @@ dependencies = [
"walkdir", "walkdir",
] ]
[[package]]
name = "gloo-timers"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bbb143cf96099802033e0d4f4963b19fd2e0b728bcf076cd9cf7f6634f092994"
dependencies = [
"futures-channel",
"futures-core",
"js-sys",
"wasm-bindgen",
]
[[package]] [[package]]
name = "gobject-sys" name = "gobject-sys"
version = "0.18.0" version = "0.18.0"

View File

@ -91,7 +91,7 @@ gethostname = "1.1.0"
scopeguard = "1.2.0" scopeguard = "1.2.0"
tauri-plugin-notification = "2.3.3" tauri-plugin-notification = "2.3.3"
tokio-stream = "0.1.18" tokio-stream = "0.1.18"
backoff = { version = "0.4.0", features = ["tokio"] } backon = { version = "1.6.0", features = ["tokio-sleep"] }
tauri-plugin-http = "2.5.7" tauri-plugin-http = "2.5.7"
console-subscriber = { version = "0.5.0", optional = true } console-subscriber = { version = "0.5.0", optional = true }
tauri-plugin-devtools = { version = "2.0.1" } tauri-plugin-devtools = { version = "2.0.1" }

View File

@ -13,7 +13,7 @@ use crate::{
utils::{dirs, help}, utils::{dirs, help},
}; };
use anyhow::{Result, anyhow}; use anyhow::{Result, anyhow};
use backoff::{Error as BackoffError, ExponentialBackoff}; use backon::{ExponentialBuilder, Retryable as _};
use clash_verge_draft::Draft; use clash_verge_draft::Draft;
use clash_verge_logging::{Type, logging, logging_error}; use clash_verge_logging::{Type, logging, logging_error};
use serde_yaml_ng::{Mapping, Value}; use serde_yaml_ng::{Mapping, Value};
@ -204,23 +204,21 @@ impl Config {
} }
pub async fn verify_config_initialization() { pub async fn verify_config_initialization() {
let backoff_strategy = ExponentialBackoff { let backoff = ExponentialBuilder::default()
initial_interval: std::time::Duration::from_millis(100), .with_min_delay(std::time::Duration::from_millis(100))
max_interval: std::time::Duration::from_secs(2), .with_max_delay(std::time::Duration::from_secs(2))
max_elapsed_time: Some(std::time::Duration::from_secs(10)), .with_factor(2.0)
multiplier: 2.0, .with_max_times(10);
..Default::default()
};
let operation = || async { if let Err(e) = (|| async {
if Self::runtime().await.latest_arc().config.is_some() { if Self::runtime().await.latest_arc().config.is_some() {
return Ok::<(), BackoffError<anyhow::Error>>(()); return Ok::<(), anyhow::Error>(());
} }
Self::generate().await
Self::generate().await.map_err(BackoffError::transient) })
}; .retry(backoff)
.await
if let Err(e) = backoff::future::retry(backoff_strategy, operation).await { {
logging!(error, Type::Setup, "Config init verification failed: {}", e); logging!(error, Type::Setup, "Config init verification failed: {}", e);
} }
} }

View File

@ -2,6 +2,7 @@ use crate::constants::files::DNS_CONFIG;
use crate::{config::Config, process::AsyncHandler, utils::dirs}; use crate::{config::Config, process::AsyncHandler, utils::dirs};
use anyhow::Error; use anyhow::Error;
use arc_swap::{ArcSwap, ArcSwapOption}; use arc_swap::{ArcSwap, ArcSwapOption};
use backon::{ConstantBuilder, Retryable as _};
use clash_verge_logging::{Type, logging}; use clash_verge_logging::{Type, logging};
use once_cell::sync::OnceCell; use once_cell::sync::OnceCell;
use reqwest_dav::list_cmd::{ListEntity, ListFile}; use reqwest_dav::list_cmd::{ListEntity, ListFile};
@ -166,40 +167,25 @@ impl WebDavClient {
let client = self.get_client(Operation::Upload).await?; let client = self.get_client(Operation::Upload).await?;
let webdav_path: String = format!("{}/{}", dirs::BACKUP_DIR, file_name).into(); let webdav_path: String = format!("{}/{}", dirs::BACKUP_DIR, file_name).into();
// 读取文件并上传,如果失败尝试一次重试
let file_content = fs::read(&file_path).await?; let file_content = fs::read(&file_path).await?;
// 添加超时保护 let backoff = ConstantBuilder::default()
let upload_result = timeout( .with_delay(Duration::from_millis(500))
.with_max_times(1);
(|| async {
timeout(
Duration::from_secs(TIMEOUT_UPLOAD), Duration::from_secs(TIMEOUT_UPLOAD),
client.put(&webdav_path, file_content.clone()), client.put(&webdav_path, file_content.clone()),
) )
.await;
match upload_result {
Err(_) => {
logging!(warn, Type::Backup, "Warning: Upload timed out, retrying once");
tokio::time::sleep(Duration::from_millis(500)).await;
timeout(
Duration::from_secs(TIMEOUT_UPLOAD),
client.put(&webdav_path, file_content),
)
.await??; .await??;
Ok(()) Ok::<(), Error>(())
} })
.retry(backoff)
Ok(Err(e)) => { .notify(|err, dur| {
logging!(warn, Type::Backup, "Warning: Upload failed, retrying once: {e}"); logging!(warn, Type::Backup, "Upload failed: {err}, retrying in {dur:?}");
tokio::time::sleep(Duration::from_millis(500)).await; })
timeout( .await
Duration::from_secs(TIMEOUT_UPLOAD),
client.put(&webdav_path, file_content),
)
.await??;
Ok(())
}
Ok(Ok(_)) => Ok(()),
}
} }
pub async fn download(&self, filename: String, storage_path: PathBuf) -> Result<(), Error> { pub async fn download(&self, filename: String, storage_path: PathBuf) -> Result<(), Error> {

View File

@ -84,7 +84,7 @@ impl CoreManager {
#[cfg(target_os = "windows")] #[cfg(target_os = "windows")]
async fn wait_for_service_if_needed(&self) { async fn wait_for_service_if_needed(&self) {
use crate::{config::Config, constants::timing, core::service}; use crate::{config::Config, constants::timing, core::service};
use backoff::{Error as BackoffError, ExponentialBackoff}; use backon::{ConstantBuilder, Retryable as _};
let needs_service = Config::verge().await.latest_arc().enable_tun_mode.unwrap_or(false); let needs_service = Config::verge().await.latest_arc().enable_tun_mode.unwrap_or(false);
@ -92,16 +92,12 @@ impl CoreManager {
return; return;
} }
let backoff = ExponentialBackoff { let max_times = timing::SERVICE_WAIT_MAX.as_millis() / timing::SERVICE_WAIT_INTERVAL.as_millis();
initial_interval: timing::SERVICE_WAIT_INTERVAL, let backoff = ConstantBuilder::default()
max_interval: timing::SERVICE_WAIT_INTERVAL, .with_delay(timing::SERVICE_WAIT_INTERVAL)
max_elapsed_time: Some(timing::SERVICE_WAIT_MAX), .with_max_times(max_times as usize);
multiplier: 1.0,
randomization_factor: 0.0,
..Default::default()
};
let operation = || async { let _ = (|| async {
let mut manager = SERVICE_MANAGER.lock().await; let mut manager = SERVICE_MANAGER.lock().await;
if matches!(manager.current(), ServiceStatus::Ready) { if matches!(manager.current(), ServiceStatus::Ready) {
@ -111,19 +107,19 @@ impl CoreManager {
// If the service IPC path is not ready yet, treat it as transient and retry. // If the service IPC path is not ready yet, treat it as transient and retry.
// Running init/refresh too early can mark service state unavailable and break later config reloads. // Running init/refresh too early can mark service state unavailable and break later config reloads.
if !service::is_service_ipc_path_exists() { if !service::is_service_ipc_path_exists() {
return Err(BackoffError::transient(anyhow::anyhow!("Service IPC not ready"))); return Err(anyhow::anyhow!("Service IPC not ready"));
} }
manager.init().await.map_err(BackoffError::transient)?; manager.init().await?;
let _ = manager.refresh().await; let _ = manager.refresh().await;
if matches!(manager.current(), ServiceStatus::Ready) { if matches!(manager.current(), ServiceStatus::Ready) {
Ok(()) Ok(())
} else { } else {
Err(BackoffError::transient(anyhow::anyhow!("Service not ready"))) Err(anyhow::anyhow!("Service not ready"))
} }
}; })
.retry(backoff)
let _ = backoff::future::retry(backoff, operation).await; .await;
} }
} }

View File

@ -4,6 +4,7 @@ use crate::{
utils::dirs, utils::dirs,
}; };
use anyhow::{Context as _, Result, anyhow, bail}; use anyhow::{Context as _, Result, anyhow, bail};
use backon::{ConstantBuilder, Retryable as _};
use clash_verge_logging::{Type, logging, logging_error}; use clash_verge_logging::{Type, logging, logging_error};
use clash_verge_service_ipc::CoreConfig; use clash_verge_service_ipc::CoreConfig;
use compact_str::CompactString; use compact_str::CompactString;
@ -15,7 +16,7 @@ use std::{
process::Command as StdCommand, process::Command as StdCommand,
time::Duration, time::Duration,
}; };
use tokio::{sync::Mutex, time::sleep}; use tokio::sync::Mutex;
#[derive(Debug, Clone, PartialEq, Eq)] #[derive(Debug, Clone, PartialEq, Eq)]
pub enum ServiceStatus { pub enum ServiceStatus {
@ -441,31 +442,27 @@ pub async fn wait_and_check_service_available(status: &mut ServiceManager) -> Re
async fn wait_for_service_ipc(status: &mut ServiceManager, reason: &str) -> Result<()> { async fn wait_for_service_ipc(status: &mut ServiceManager, reason: &str) -> Result<()> {
status.0 = ServiceStatus::Unavailable(reason.into()); status.0 = ServiceStatus::Unavailable(reason.into());
let config = ServiceManager::config(); let config = ServiceManager::config();
let mut attempts = 0u32;
#[allow(unused_assignments)]
let mut last_err = anyhow!("service not ready");
loop { let backoff = ConstantBuilder::default()
.with_delay(config.retry_delay)
.with_max_times(config.max_retries);
let result = (|| async {
if Path::new(clash_verge_service_ipc::IPC_PATH).exists() { if Path::new(clash_verge_service_ipc::IPC_PATH).exists() {
match clash_verge_service_ipc::connect().await { clash_verge_service_ipc::connect().await?;
Ok(_) => { Ok(())
status.0 = ServiceStatus::Ready;
return Ok(());
}
Err(e) => last_err = e,
}
} else { } else {
last_err = anyhow!("IPC path not ready"); Err(anyhow!("IPC path not ready"))
}
})
.retry(backoff)
.await;
if result.is_ok() {
status.0 = ServiceStatus::Ready;
} }
if attempts >= config.max_retries as u32 { result
break;
}
attempts += 1;
sleep(config.retry_delay).await;
}
Err(last_err)
} }
pub fn is_service_ipc_path_exists() -> bool { pub fn is_service_ipc_path_exists() -> bool {