Skip to content

Commit 9d28704

Browse files
committed
Support multi threaded partial-content download.
1 parent 2386f9d commit 9d28704

File tree

7 files changed

+5838
-51
lines changed

7 files changed

+5838
-51
lines changed

2-mb-file.txt

+5,697
Large diffs are not rendered by default.

Cargo.lock

+7
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/http_message.rs

+2
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@ pub struct HTTPMessage {
99

1010
impl HTTPMessage {
1111
pub fn new(raw_message: &str) -> HTTPMessage {
12+
println!("Raw message while parsing \n`{}`\n", raw_message);
13+
1214
let (headers, body) = raw_message.split_once("\r\n\r\n").unwrap();
1315
let lines = headers.lines().enumerate();
1416
let mut headers: HashMap<String, String> = HashMap::new();

src/lib.rs

+60
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,12 @@
11
mod http_message;
22
pub mod tcp;
3+
use std::fs;
4+
use std::{cmp::min, str, thread, time};
5+
36
pub use crate::http_message::HTTPMessage;
7+
use crate::tcp::{download_part, head};
8+
9+
static DIVISIONS: usize = 302400;
410

511
pub fn find_subsequence<T>(haystack: &[T], needle: &[T]) -> Option<usize>
612
where
@@ -10,3 +16,57 @@ where
1016
.windows(needle.len())
1117
.position(|window| window == needle)
1218
}
19+
20+
pub fn download(server_details: &str, path: &str) {
21+
let head_req = head(&server_details, path);
22+
// println!("{:?}", head_req);
23+
24+
if let Some(value) = head_req.headers.get("Accept-Ranges") {
25+
if value != "bytes" {
26+
eprintln!("Server does not support downloading byte ranges");
27+
return;
28+
}
29+
30+
let content_length: usize =
31+
str::parse(head_req.headers.get("Content-Length").unwrap()).unwrap();
32+
33+
println!("Content-Length: {}", content_length);
34+
35+
let mut buf: Vec<u8> = Vec::with_capacity(content_length);
36+
let mut threads = Vec::new();
37+
38+
let mut consumed = 0;
39+
let mut i = 0;
40+
while consumed < content_length {
41+
println!("{}", i);
42+
i += 1;
43+
let address = server_details.to_string();
44+
let path = path.to_string();
45+
46+
threads.push(thread::spawn(move || {
47+
let data = download_part(
48+
address,
49+
path,
50+
consumed,
51+
min(consumed + DIVISIONS, content_length - 1),
52+
);
53+
data
54+
}));
55+
56+
// let one_second = time::Duration::from_secs(1);
57+
// thread::sleep(one_second);
58+
59+
consumed += DIVISIONS;
60+
}
61+
62+
for handle in threads {
63+
let data = handle.join().unwrap();
64+
buf.extend(data);
65+
}
66+
67+
fs::write("data.txt", buf).unwrap();
68+
} else {
69+
eprintln!("Mult-threaded downloads not supported for this request.");
70+
return;
71+
}
72+
}

src/main.rs

+8-12
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,12 @@
1-
use multidl::{find_subsequence, tcp::download_part};
2-
3-
use std::net::{TcpListener, TcpStream, ToSocketAddrs};
1+
use multidl::download;
2+
use std::env;
43

54
fn main() {
6-
let server_details = "example.com:80";
7-
let server: Vec<_> = server_details
8-
.to_socket_addrs()
9-
.expect("Unable to resolve domain")
10-
.collect();
11-
println!("{:?}", server);
5+
let args: Vec<String> = env::args().collect();
6+
println!("{:?}", args);
7+
8+
let server_details = &args[1];
9+
let path = &args[2];
1210

13-
// Even easier, if you want to connect right away:
14-
let stream = TcpStream::connect(server_details).expect("Unable to connect to server");
15-
// let listener = TcpListener::bind(server[1]).unwrap();
11+
download(server_details, path)
1612
}

src/tcp.rs

+58-38
Original file line numberDiff line numberDiff line change
@@ -1,51 +1,71 @@
11
use std::{
2-
fmt::format,
32
io::{Read, Write},
4-
net::TcpListener,
3+
net::TcpStream,
4+
str,
55
};
66

7-
use std::net::{SocketAddr, ToSocketAddrs};
7+
use crate::{find_subsequence, HTTPMessage};
88

9-
use std::str;
9+
static BUFFER_SIZE: usize = 409600;
1010

11-
use crate::{find_subsequence, HTTPMessage};
11+
pub fn head(address: &str, path: &str) -> HTTPMessage {
12+
let mut stream = TcpStream::connect(address).expect("Unable to connect to server");
13+
let (host, _) = address.split_once(":").unwrap();
14+
let request = format!(
15+
"HEAD {} HTTP/1.1\r\nHost: {}\r\nAccept: */*\r\n\r\n",
16+
path, host
17+
);
18+
stream.write(request.as_bytes()).unwrap();
19+
let mut buffer: Vec<u8> = vec![0; 1024];
20+
stream.read(&mut buffer).unwrap();
1221

13-
pub fn download_part(
14-
address: &str,
15-
path: &str,
16-
bytes_from: usize,
17-
bytes_to: usize,
18-
) -> Option<Vec<u8>> {
19-
let addr = address.to_socket_addrs().unwrap().next().unwrap();
20-
println!("{:?}", addr);
21-
let listener = TcpListener::bind(addr).unwrap();
22+
HTTPMessage::new(str::from_utf8(&buffer).unwrap())
23+
}
24+
25+
pub fn download_part(address: String, path: String, bytes_from: usize, bytes_to: usize) -> Vec<u8> {
26+
let mut stream = TcpStream::connect(&address).expect("Unable to connect to server");
2227
let (host, _) = address.split_once(":").unwrap();
23-
for stream in listener.incoming() {
24-
match stream {
25-
Ok(mut stream) => {
26-
let request = format!(
27-
"GET {} HTTP/1.1\r\nHost: {}\r\nRange: bytes={}-{}\r\nAccept: */*\r\n\r\n",
28-
path, host, bytes_from, bytes_to
29-
);
30-
stream.write(request.as_bytes());
31-
32-
let body_size = bytes_to - bytes_from + 1;
33-
let mut buffer: Vec<u8> = Vec::with_capacity(body_size + 1024);
34-
let body_break: Vec<u8> = vec![13, 10];
35-
stream.read_to_end(&mut buffer).unwrap();
36-
37-
let loc_body_break = find_subsequence(&buffer, &body_break).unwrap();
38-
let metadata = str::from_utf8(&buffer[0..loc_body_break + 2]).unwrap();
39-
let message = HTTPMessage::new(metadata);
40-
println!("{:?}", message);
41-
}
42-
Err(e) => {
43-
eprintln!("Failed to get stream from Tcp Listener : {}", e);
44-
}
28+
let request = format!(
29+
"GET {} HTTP/1.1\r\nHost: {}\r\nRange: bytes={}-{}\r\nAccept: */*\r\n\r\n",
30+
path, host, bytes_from, bytes_to
31+
);
32+
stream.write(request.as_bytes()).unwrap();
33+
34+
let body_size = bytes_to - bytes_from + 1;
35+
let mut buffer: Vec<u8> = vec![0; BUFFER_SIZE];
36+
let mut body: Vec<u8> = Vec::with_capacity(body_size);
37+
let body_break: Vec<u8> = "\r\n\r\n".as_bytes().to_vec();
38+
let mut content_consumed: usize = 0;
39+
40+
let mut first = true;
41+
42+
while content_consumed < body_size {
43+
let bytes_read = stream.read(&mut buffer).unwrap();
44+
println!("Read {} bytes", bytes_read);
45+
46+
if bytes_read == 0 {
47+
panic!("WTF");
4548
}
4649

47-
return None;
50+
if first {
51+
first = false;
52+
let loc_body_break = find_subsequence(&buffer, &body_break).unwrap();
53+
let metadata = str::from_utf8(&buffer[0..loc_body_break + 4]).unwrap();
54+
let message = HTTPMessage::new(metadata);
55+
let _content_length: usize =
56+
str::parse(message.headers.get("Content-Length").unwrap()).unwrap();
57+
let content = &buffer[loc_body_break + 4..bytes_read];
58+
content_consumed += content.len();
59+
body.extend_from_slice(content);
60+
} else {
61+
content_consumed += bytes_read;
62+
body.extend_from_slice(&buffer[0..bytes_read]);
63+
}
64+
65+
println!("Consumed {} bytes of total {}", content_consumed, body_size);
4866
}
4967

50-
unimplemented!()
68+
assert_eq!(body.len(), body_size);
69+
70+
return body;
5171
}

src/tempCodeRunnerFile.rs

+6-1
Original file line numberDiff line numberDiff line change
@@ -1 +1,6 @@
1-
connect
1+
// println!(
2+
// "{}/{} -> `{}`",
3+
// num_bytes_read_now,
4+
// body_size,
5+
// str::from_utf8(&buffer).unwrap()
6+
// );

0 commit comments

Comments
 (0)