use crate::params::RequestParams;
use crate::{Client, Error, Params, Result};
use reqwest::{multipart, Body};
use serde::Deserialize;
use serde_json::Value;
use std::collections::HashMap;
use std::path::PathBuf;
use tokio::fs::File;
use tokio::io::AsyncReadExt;
use tracing::{trace, warn};
pub(crate) mod params;
#[derive(Deserialize, Debug)]
struct Response {
upload: UploadResponse,
}
#[derive(Deserialize, Debug)]
#[serde(tag = "result")]
enum UploadResponse {
Continue { offset: u64, filekey: String },
Success { filekey: String },
Warning { warnings: HashMap<String, Value> },
}
#[derive(Deserialize, Debug)]
struct PublishResponse {
upload: PublishState,
}
#[derive(Deserialize, Debug)]
#[serde(tag = "result")]
enum PublishState {
Poll { stage: String },
Success { filename: String },
Warning { warnings: HashMap<String, Value> },
}
pub(crate) struct UploadRequest {
pub(crate) filename: String,
pub(crate) file: PathBuf,
pub(crate) chunk_size: usize,
pub(crate) base_params: Params,
pub(crate) upload_params: Params,
}
impl UploadRequest {
fn upload_params(&self) -> Params {
let mut params = self.base_params.clone();
params.map.extend(self.upload_params.clone().map);
params
}
}
pub(crate) async fn upload(
client: &Client,
req: UploadRequest,
) -> Result<String> {
let file = File::open(&req.file).await?;
let file_size = file.metadata().await?.len();
trace!(
"beginning file upload, size={size}, filename={filename}, file={file}",
size = file_size,
filename = req.filename,
file = req.file.display()
);
let filename = if file_size < (req.chunk_size as u64) {
simple_upload(client, req, file).await?
} else {
chunked_upload(client, req, file, file_size).await?
};
trace!(
"finished file upload, filename={filename}",
filename = filename,
);
Ok(filename)
}
async fn simple_upload(
client: &Client,
req: UploadRequest,
file: File,
) -> Result<String> {
let mut params = req.upload_params();
params.insert("token", client.token("csrf").await?);
let params = params::MultipartParams {
params,
parts: HashMap::from([(
"file".to_string(),
multipart::Part::stream(Body::from(file)).file_name("whatever"),
)]),
};
trace!(
"sending simple upload for filename={filename}, file={file}",
filename = req.filename,
file = req.file.display()
);
client
.inner
.do_request(RequestParams::Multipart(params))
.await?;
Ok("TODO".to_string())
}
async fn chunked_upload(
client: &Client,
req: UploadRequest,
mut file: File,
file_size: u64,
) -> Result<String> {
let token = client.token("csrf").await?;
let mut bytes_sent: u64 = 0;
let mut file_key: Option<String> = None;
loop {
let mut params = req.base_params.clone();
params.insert("filesize", file_size);
params.insert("stash", 1);
params.insert("offset", bytes_sent);
if let Some(file_key) = &file_key {
params.insert("filekey", file_key);
}
params.insert("token", &token);
let remaining_bytes = file_size - bytes_sent;
let to_read = if remaining_bytes < (req.chunk_size as u64) {
remaining_bytes as usize
} else {
req.chunk_size
};
let mut buf = vec![0; to_read];
let read_bytes = file.read_exact(&mut buf).await?;
trace!(
"read {bytes_len} out of file={file} for filename={filename}",
bytes_len = read_bytes,
filename = req.filename,
file = req.file.display()
);
let part = multipart::Part::bytes(buf).file_name("whatever");
let params = params::MultipartParams {
params,
parts: HashMap::from([("chunk".to_string(), part)]),
};
let resp: Response = serde_json::from_value(
client
.inner
.do_request(RequestParams::Multipart(params))
.await?,
)?;
match resp.upload {
UploadResponse::Continue { filekey, offset } => {
bytes_sent = offset;
file_key = Some(filekey);
}
UploadResponse::Success { filekey } => {
file_key = Some(filekey);
break;
}
UploadResponse::Warning { warnings } => {
return handle_warnings(warnings, &req);
}
}
}
let file_key = file_key.expect("filekey must be set by this point");
trace!(
"finished uploading file={file} for filename={filename}, under filekey={file_key}",
filename = req.filename,
file = req.file.display(),
file_key = file_key,
);
let mut params = req.upload_params();
params.insert("filekey", &file_key);
params.insert("async", 1);
let resp: PublishResponse = client.post_with_token("csrf", params).await?;
match resp.upload {
PublishState::Poll { stage } => {
trace!(
"waiting for upload of filename={filename} to finish, stage={stage}",
filename = req.filename,
stage = stage,
);
crate::time::sleep(2).await;
}
PublishState::Success { filename } => {
return Ok(filename);
}
PublishState::Warning { warnings } => {
return handle_warnings(warnings, &req);
}
}
let mut params = req.base_params.clone();
params.insert("filekey", &file_key);
params.insert("checkstatus", 1);
loop {
let resp: PublishResponse =
client.post_with_token("csrf", params.clone()).await?;
match resp.upload {
PublishState::Poll { stage } => {
trace!(
"waiting for upload of filename={filename} to finish, stage={stage}",
filename = req.filename,
stage = stage,
);
crate::time::sleep(2).await;
}
PublishState::Success { filename } => {
return Ok(filename);
}
PublishState::Warning { warnings } => {
return handle_warnings(warnings, &req);
}
}
}
}
fn handle_warnings(
warnings: HashMap<String, Value>,
req: &UploadRequest,
) -> Result<String> {
for (warning, val) in &warnings {
warn!(
"Uploading {} caused warning {warning}: {val:?}",
req.filename
);
}
Err(Error::UploadWarning(warnings.into_keys().collect()))
}