Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions components/backup/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,10 @@ tikv = { workspace = true }
tikv_alloc = { workspace = true }
tikv_util = { workspace = true }
tokio = { version = "1.5", features = ["rt-multi-thread"] }
tokio-util = { version = "0.7", features = ["compat"] }
tokio-stream = "0.1"
txn_types = { workspace = true }
uuid = "0.8"
yatp = { workspace = true }

[dev-dependencies]
Expand Down
234 changes: 230 additions & 4 deletions components/backup/src/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ use std::{
use async_channel::SendError;
use causal_ts::{CausalTsProvider, CausalTsProviderImpl};
use concurrency_manager::ConcurrencyManager;
use engine_traits::{name_to_cf, raw_ttl::ttl_current_ts, CfName, KvEngine, SstCompressionType};
use external_storage::{BackendConfig, HdfsConfig};
use engine_traits::{name_to_cf, raw_ttl::ttl_current_ts, CfName, KvEngine, SstCompressionType, Checkpointer, CF_DEFAULT, CF_WRITE};
use external_storage::{BackendConfig, HdfsConfig, UnpinReader};
use external_storage_export::{create_storage, ExternalStorage};
use futures::{channel::mpsc::*, executor::block_on};
use kvproto::{
Expand All @@ -24,6 +24,7 @@ use kvproto::{
use online_config::OnlineConfig;
use raft::StateRole;
use raftstore::coprocessor::RegionInfoProvider;
use segment_manager::SegmentMapManager;
use tikv::{
config::BackupConfig,
storage::{
Expand All @@ -42,6 +43,7 @@ use tikv_util::{
worker::Runnable,
};
use tokio::runtime::Runtime;
use tokio_util::compat::TokioAsyncReadCompatExt;
use txn_types::{Key, Lock, TimeStamp};

use crate::{
Expand Down Expand Up @@ -71,6 +73,8 @@ struct Request {
compression_level: i32,
cipher: CipherInfo,
replica_read: bool,
mode: BackupMode,
ssts_id: String,
}

// Backup Operation corrosponsed to backup service
Expand Down Expand Up @@ -103,6 +107,8 @@ impl fmt::Debug for Task {
.field("is_raw_kv", &self.request.is_raw_kv)
.field("dst_api_ver", &self.request.dst_api_ver)
.field("cf", &self.request.cf)
.field("mode", &self.request.mode)
.field("unique_id", &self.request.ssts_id)
.finish()
}
}
Expand Down Expand Up @@ -141,6 +147,8 @@ impl Task {
compression_type: req.get_compression_type(),
compression_level: req.get_compression_level(),
replica_read: req.get_replica_read(),
mode: req.get_mode(),
ssts_id: req.unique_id,
cipher: req.cipher_info.unwrap_or_else(|| {
let mut cipher = CipherInfo::default();
cipher.set_cipher_type(EncryptionMethod::Plaintext);
Expand Down Expand Up @@ -298,6 +306,94 @@ async fn send_to_worker_with_metrics<EK: KvEngine>(
Ok(())
}

struct SstSendInfo {
file_names_d: Vec<Vec<(String, usize)>>,
file_names_w: Vec<Vec<(String, usize)>>,
start_key: Vec<u8>,
end_key: Vec<u8>,
}

async fn save_sst_file_worker (
mut segment_manager: SegmentMapManager,
rx: async_channel::Receiver<SstSendInfo>,
tx: UnboundedSender<BackupResponse>,
storage: Arc<dyn ExternalStorage>,
) {
while let Ok(msg) = rx.recv().await {
let mut response = BackupResponse::default();
let mut d_progress_l = 0;
let mut d_progress_f = 0;
let mut w_progress_l = 0;
let mut w_progress_f = 0;
match upload_sst_file(
&msg.file_names_d,
&msg.file_names_w,
&mut d_progress_l, &mut d_progress_f,
&mut w_progress_l, &mut w_progress_f,
storage.clone()).await {
Err(e) => {
error_unknown!(?e; "upload sst file failed";
"start_key" => &log_wrappers::Value::key(&msg.start_key),
"end_key" => &log_wrappers::Value::key(&msg.end_key),
);
let e = Error::from(e);
response.set_error(e.into());
}
Ok(_) => {
let file = File::default();
response.set_files(vec![file].into());
}
}

segment_manager.release_index(
msg.file_names_d, d_progress_l, d_progress_f,
msg.file_names_w, w_progress_l, w_progress_f,
);

response.set_start_key(msg.start_key.clone());
response.set_end_key(msg.end_key.clone());
// todo: send the count.
if let Err(e) = tx.unbounded_send(response) {
error_unknown!(?e; "backup failed to send response";
"start_key" => &log_wrappers::Value::key(&msg.start_key),
"end_key" => &log_wrappers::Value::key(&msg.end_key),);
if e.is_disconnected() {
return;
}
}
}
}

async fn upload_sst_file(
ssts_d: &Vec<Vec<(String, usize)>>,
ssts_w: &Vec<Vec<(String, usize)>>,
d_progress_l: &mut usize, d_progress_f: &mut usize,
w_progress_l: &mut usize, w_progress_f: &mut usize,
storage: Arc<dyn ExternalStorage>,
) -> std::io::Result<()> {
upload_sst_file_internal(ssts_d, d_progress_l, d_progress_f, storage.clone()).await?;
upload_sst_file_internal(ssts_w, w_progress_l, w_progress_f, storage).await
}

async fn upload_sst_file_internal(
ssts: &Vec<Vec<(String, usize)>>,
progress_l: &mut usize,
progress_f: &mut usize,
storage: Arc<dyn ExternalStorage>,
) -> std::io::Result<()> {
for fs in ssts {
for (file_name, _) in fs {
Comment thread
Leavrth marked this conversation as resolved.
Outdated
let file = tokio::fs::File::open(file_name).await?;
Comment thread
Leavrth marked this conversation as resolved.
Outdated
let length = file.metadata().await?.len();
let reader = UnpinReader(Box::new(file.compat()));
storage.write(file_name, reader, length).await?;
*progress_f += 1;
}
*progress_l += 1;
}
Ok(())
}

impl BackupRange {
/// Get entries from the scanner and save them to storage
async fn backup<E: Engine>(
Expand Down Expand Up @@ -690,6 +786,7 @@ pub struct Endpoint<E: Engine, R: RegionInfoProvider + Clone + 'static> {
tablets: LocalTablets<E::Local>,
config_manager: ConfigManager,
concurrency_manager: ConcurrencyManager,
segment_manager: SegmentMapManager,
softlimit: SoftLimitKeeper,
api_version: ApiVersion,
causal_ts_provider: Option<Arc<CausalTsProviderImpl>>, // used in rawkv apiv2 only
Expand Down Expand Up @@ -853,6 +950,7 @@ impl<E: Engine, R: RegionInfoProvider + Clone + 'static> Endpoint<E, R> {
let config_manager = ConfigManager(Arc::new(RwLock::new(config)));
let softlimit = SoftLimitKeeper::new(config_manager.clone());
rt.spawn(softlimit.clone().run());
let segment_manager = SegmentMapManager::new();
Endpoint {
store_id,
engine,
Expand All @@ -863,6 +961,7 @@ impl<E: Engine, R: RegionInfoProvider + Clone + 'static> Endpoint<E, R> {
softlimit,
config_manager,
concurrency_manager,
segment_manager,
api_version,
causal_ts_provider,
}
Expand Down Expand Up @@ -1114,6 +1213,36 @@ impl<E: Engine, R: RegionInfoProvider + Clone + 'static> Endpoint<E, R> {
let backend = Arc::<dyn ExternalStorage>::from(backend);
let concurrency = self.config_manager.0.read().unwrap().num_threads;
self.pool.borrow_mut().adjust_with(concurrency);

match request.mode {
BackupMode::Scan => self.handle_scan_backup(
concurrency,
prs,
backend,
codec,
request,
resp,
),
BackupMode::File => self.handle_file_backup(
concurrency,
prs,
backend,
request,
resp,
),
_ => error!("unknown backup mode"; "mode" => ?request.mode),
};
}

fn handle_scan_backup(
&self,
concurrency: usize,
prs: Arc<Mutex<Progress<R>>>,
backend: Arc<dyn ExternalStorage>,
codec: KeyValueCodec,
request: Request,
resp: UnboundedSender<BackupResponse>,
) {
let (tx, rx) = async_channel::bounded(1);
for _ in 0..concurrency {
self.spawn_backup_worker(
Expand All @@ -1131,6 +1260,94 @@ impl<E: Engine, R: RegionInfoProvider + Clone + 'static> Endpoint<E, R> {
));
}
}

fn handle_file_backup(
&self,
concurrency: usize,
prs: Arc<Mutex<Progress<R>>>,
backend: Arc<dyn ExternalStorage>,
request: Request,
resp_tx: UnboundedSender<BackupResponse>,
) {
let (tx, rx) = async_channel::bounded(1);
for _ in 0..concurrency {
self.io_pool.spawn(save_sst_file_worker(
self.segment_manager.clone(),
rx.clone(),
resp_tx.clone(),
backend.clone(),
));
}

let mut segment_manager = self.segment_manager.clone();
let id = request.ssts_id;
let batch_size = self.config_manager.0.read().unwrap().batch_size;
self.pool.borrow_mut().spawn(async move {
loop {
let batch = {
let progress: &mut Progress<_> = &mut prs.lock().unwrap();
let batch = progress.forward(batch_size, request.replica_read);
if batch.is_empty() {
return;
}
batch
};

for brange in batch {
if request.cancel.load(Ordering::SeqCst) {
warn!("backup task has canceled"; "range" => ?brange);
return;
}
let start_key = brange.start_key.map_or_else(Vec::new, |k| k.into_raw().unwrap());
let end_key = brange.end_key.map_or_else(Vec::new, |k| k.into_raw().unwrap());
let (d_ssts, w_ssts) = segment_manager.find_ssts(&id, &start_key, &end_key);
if let Err(err) = tx.send(SstSendInfo {
file_names_d: d_ssts.clone(),
file_names_w: w_ssts.clone(),
start_key,
end_key,
}).await {
error_unknown!(%err; "error during backup");
segment_manager.release_index(
d_ssts,
usize::MAX,
usize::MAX,
w_ssts,
usize::MAX,
usize::MAX,
);
let mut resp = BackupResponse::new();
let err = Error::from(err);
resp.set_error(err.into());
if let Err(err) = resp_tx.unbounded_send(resp) {
warn!("failed to send response"; "err" => ?err)
}
}
}
}

})
}

pub fn prepare(&mut self, _persistence: bool, mut tx: Sender<PrepareResponse>) {
let checkpointer = self.engine.checkpointer().unwrap();
let default_metadata = checkpointer.column_family_meta_data(CF_DEFAULT).unwrap();
let write_metadata = checkpointer.column_family_meta_data(CF_WRITE).unwrap();

let s1 = format!("default_segmentmap:{:?}", default_metadata);
let s2 = format!("write_segmentmap:{:?}", write_metadata);
info!("{}", s1);
info!("{}", s2);

let id = self.segment_manager.register(default_metadata.ssts, write_metadata.ssts);
let mut resp = PrepareResponse::new();
resp.set_unique_id(id);
resp.set_collect_file_count((default_metadata.file_count + write_metadata.file_count) as u64);
resp.set_collect_file_size((default_metadata.file_size + write_metadata.file_size) as u64);
if let Err(e) = tx.try_send(resp) {
error_unknown!(?e; "[prepare] failed to send response");
}
}
}

impl<E: Engine, R: RegionInfoProvider + Clone + 'static> Runnable for Endpoint<E, R> {
Expand All @@ -1146,8 +1363,9 @@ impl<E: Engine, R: RegionInfoProvider + Clone + 'static> Runnable for Endpoint<E
info!("run backup task"; "task" => %task);
self.handle_backup_task(task);
}
Operation::Prepare(_persistent, _tx) => {
unimplemented!();
Operation::Prepare(persistent, tx) => {
info!("run prepare");
self.prepare(persistent, tx);
}
Operation::Cleanup(_unique_id, _tx) => {
unimplemented!();
Expand Down Expand Up @@ -1585,6 +1803,8 @@ pub mod tests {
compression_level: 0,
cipher: CipherInfo::default(),
replica_read: false,
mode: BackupMode::Scan,
ssts_id: String::from("test"),
},
resp: tx,
};
Expand Down Expand Up @@ -1694,6 +1914,8 @@ pub mod tests {
compression_level: 0,
cipher: CipherInfo::default(),
replica_read: false,
mode: BackupMode::Scan,
ssts_id: String::from("test"),
},
resp: tx,
};
Expand Down Expand Up @@ -1723,6 +1945,8 @@ pub mod tests {
compression_level: 0,
cipher: CipherInfo::default(),
replica_read: true,
mode: BackupMode::Scan,
ssts_id: String::from("test"),
},
resp: tx,
};
Expand Down Expand Up @@ -1836,6 +2060,8 @@ pub mod tests {
compression_level: 0,
cipher: CipherInfo::default(),
replica_read: false,
mode: BackupMode::Scan,
ssts_id: String::from("test"),
},
resp: tx,
};
Expand Down
1 change: 1 addition & 0 deletions components/backup/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ mod service;
mod softlimit;
mod utils;
mod writer;
mod segment_manager;

pub use endpoint::{backup_file_name, Endpoint, Operation, Task};
pub use errors::{Error, Result};
Expand Down
Loading