সুন্দরভাবে শাটডাউন এবং পরিচ্ছন্নতা
লিস্টিং ২১-২০ এর কোডটি আমাদের উদ্দেশ্য অনুযায়ী, একটি থ্রেড পুল ব্যবহার করে অ্যাসিঙ্ক্রোনাসভাবে রিকোয়েস্টের জবাব দিচ্ছে। আমরা workers
, id
, এবং thread
ফিল্ডগুলো সরাসরি ব্যবহার করছি না বলে কিছু ওয়ার্নিং পাচ্ছি, যা আমাদের মনে করিয়ে দিচ্ছে যে আমরা কোনো কিছুই পরিচ্ছন্ন (cleanup) করছি না। যখন আমরা প্রধান থ্রেডটি বন্ধ করার জন্য কম মার্জিত ctrl-C` পদ্ধতি ব্যবহার করি, তখন অন্য সমস্ত থ্রেডও সঙ্গে সঙ্গে বন্ধ হয়ে যায়, এমনকি যদি তারা কোনো রিকোয়েস্ট সার্ভ করার মাঝখানেও থাকে।
এরপর, আমরা Drop
ট্রেইট ইমপ্লিমেন্ট করব যাতে পুলের প্রতিটি থ্রেডের উপর join
কল করা যায় এবং তারা বন্ধ হওয়ার আগে তাদের হাতে থাকা রিকোয়েস্টগুলোর কাজ শেষ করতে পারে। তারপর আমরা থ্রেডগুলোকে জানানোর জন্য একটি উপায় ইমপ্লিমেন্ট করব যে তাদের নতুন রিকোয়েস্ট গ্রহণ করা বন্ধ করে শাটডাউন করা উচিত। এই কোডটি বাস্তবে দেখার জন্য, আমরা আমাদের সার্ভারটিকে এমনভাবে পরিবর্তন করব যাতে এটি তার থ্রেড পুল সুন্দরভাবে শাটডাউন করার আগে মাত্র দুটি রিকোয়েস্ট গ্রহণ করে।
এগোনোর সময় একটি বিষয় লক্ষ্য করার মতো: এই পরিবর্তনগুলোর কোনোটিই সেই কোডের অংশকে প্রভাবিত করবে না যা ক্লোজার এক্সিকিউট করার কাজ করে, তাই আমরা যদি একটি async runtime
-এর জন্য থ্রেড পুল ব্যবহার করতাম, তাহলেও সবকিছু একই থাকত।
ThreadPool
-এর উপর Drop
ট্রেইট ইমপ্লিমেন্ট করা
চলুন আমাদের থ্রেড পুলের উপর Drop
ইমপ্লিমেন্ট করার মাধ্যমে শুরু করি। যখন পুলটি ড্রপ করা হবে, আমাদের সমস্ত থ্রেড join
করা উচিত যাতে তারা তাদের কাজ শেষ করতে পারে। লিস্টিং ২১-২২-এ Drop
ইমপ্লিমেন্টেশনের একটি প্রথম প্রচেষ্টা দেখানো হয়েছে; এই কোডটি এখনও ঠিকভাবে কাজ করবে না।
use std::{
sync::{Arc, Mutex, mpsc},
thread,
};
pub struct ThreadPool {
workers: Vec<Worker>,
sender: mpsc::Sender<Job>,
}
type Job = Box<dyn FnOnce() + Send + 'static>;
impl ThreadPool {
/// Create a new ThreadPool.
///
/// The size is the number of threads in the pool.
///
/// # Panics
///
/// The `new` function will panic if the size is zero.
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let receiver = Arc::new(Mutex::new(receiver));
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id, Arc::clone(&receiver)));
}
ThreadPool { workers, sender }
}
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
let job = Box::new(f);
self.sender.send(job).unwrap();
}
}
impl Drop for ThreadPool {
fn drop(&mut self) {
for worker in &mut self.workers {
println!("Shutting down worker {}", worker.id);
worker.thread.join().unwrap();
}
}
}
struct Worker {
id: usize,
thread: thread::JoinHandle<()>,
}
impl Worker {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
let thread = thread::spawn(move || {
loop {
let job = receiver.lock().unwrap().recv().unwrap();
println!("Worker {id} got a job; executing.");
job();
}
});
Worker { id, thread }
}
}
প্রথমে আমরা থ্রেড পুলের প্রতিটি workers
-এর মধ্যে দিয়ে লুপ করি। আমরা এর জন্য &mut
ব্যবহার করি কারণ self
একটি মিউটেবল রেফারেন্স এবং আমাদের worker
-কেও মিউটেট করতে হবে। প্রতিটি worker
-এর জন্য, আমরা একটি বার্তা প্রিন্ট করি যে এই নির্দিষ্ট Worker
ইনস্ট্যান্সটি শাট ডাউন হচ্ছে, এবং তারপর আমরা সেই Worker
ইনস্ট্যান্সের থ্রেডে join
কল করি। যদি join
কল ব্যর্থ হয়, আমরা Rust-কে প্যানিক করতে এবং একটি অসুন্দর শাটডাউনে যেতে unwrap
ব্যবহার করি।
এই কোডটি কম্পাইল করার সময় আমরা যে এররটি পাই তা এখানে দেওয়া হলো:
$ cargo check
Checking hello v0.1.0 (file:///projects/hello)
error[E0507]: cannot move out of `worker.thread` which is behind a mutable reference
--> src/lib.rs:52:13
|
52 | worker.thread.join().unwrap();
| ^^^^^^^^^^^^^ ------ `worker.thread` moved due to this method call
| |
| move occurs because `worker.thread` has type `JoinHandle<()>`, which does not implement the `Copy` trait
|
note: `JoinHandle::<T>::join` takes ownership of the receiver `self`, which moves `worker.thread`
--> /rustc/4eb161250e340c8f48f66e2b929ef4a5bed7c181/library/std/src/thread/mod.rs:1876:17
For more information about this error, try `rustc --explain E0507`.
error: could not compile `hello` (lib) due to 1 previous error
এররটি আমাদের বলছে যে আমরা join
কল করতে পারছি না কারণ আমাদের প্রতিটি worker
-এর শুধুমাত্র একটি মিউটেবল borrow আছে এবং join
তার আর্গুমেন্টের মালিকানা নিয়ে নেয়। এই সমস্যাটি সমাধান করার জন্য, আমাদের thread
টিকে Worker
ইনস্ট্যান্স থেকে মুভ করতে হবে যা thread
এর মালিক, যাতে join
থ্রেডটিকে কনজিউম করতে পারে। এটি করার একটি উপায় হলো লিস্টিং ১৮-১৫ তে আমরা যে পদ্ধতিটি নিয়েছিলাম সেটি গ্রহণ করা। যদি Worker
একটি Option<thread::JoinHandle<()>>
ধারণ করত, আমরা Option
-এর উপর take
মেথড কল করতে পারতাম যাতে ভ্যালুটি Some
ভ্যারিয়েন্ট থেকে মুভ করে তার জায়গায় একটি None
ভ্যারিয়েন্ট রেখে দেওয়া যায়। অন্য কথায়, একটি চলমান Worker
-এর thread
-এ একটি Some
ভ্যারিয়েন্ট থাকত এবং যখন আমরা একটি Worker
-কে পরিচ্ছন্ন করতে চাইতাম, তখন আমরা Some
-কে None
দিয়ে প্রতিস্থাপন করতাম যাতে Worker
-এর চালানোর জন্য কোনো থ্রেড না থাকে।
যাইহোক, এই পরিস্থিতিটি শুধুমাত্র Worker
ড্রপ করার সময়ই আসত। এর বিনিময়ে, আমাদের worker.thread
অ্যাক্সেস করার সময় সব জায়গায় একটি Option<thread::JoinHandle<()>>
নিয়ে কাজ করতে হতো। ইডিওম্যাটিক Rust Option
অনেক ব্যবহার করে, কিন্তু যখন আপনি নিজেকে এমন কিছুকে Option
-এ র্যাপ করতে দেখেন যা আপনি জানেন সবসময় উপস্থিত থাকবে, শুধুমাত্র এই ধরনের একটি workaround হিসেবে, তখন আপনার কোডকে আরও পরিষ্কার এবং কম এরর-প্রোন করতে বিকল্প পদ্ধতির সন্ধান করা একটি ভাল ধারণা।
এই ক্ষেত্রে, একটি ভালো বিকল্প বিদ্যমান: Vec::drain
মেথড। এটি একটি রেঞ্জ প্যারামিটার গ্রহণ করে যা ভেক্টর থেকে কোন আইটেমগুলো সরাতে হবে তা নির্দিষ্ট করে এবং সেই আইটেমগুলোর একটি ইটারেটর রিটার্ন করে। ..
রেঞ্জ সিনট্যাক্স পাস করলে ভেক্টর থেকে প্রতিটি ভ্যালু সরিয়ে দেওয়া হবে।
তাই আমাদের ThreadPool
drop
ইমপ্লিমেন্টেশনটি এভাবে আপডেট করতে হবে:
#![allow(unused)] fn main() { use std::{ sync::{Arc, Mutex, mpsc}, thread, }; pub struct ThreadPool { workers: Vec<Worker>, sender: mpsc::Sender<Job>, } type Job = Box<dyn FnOnce() + Send + 'static>; impl ThreadPool { /// Create a new ThreadPool. /// /// The size is the number of threads in the pool. /// /// # Panics /// /// The `new` function will panic if the size is zero. pub fn new(size: usize) -> ThreadPool { assert!(size > 0); let (sender, receiver) = mpsc::channel(); let receiver = Arc::new(Mutex::new(receiver)); let mut workers = Vec::with_capacity(size); for id in 0..size { workers.push(Worker::new(id, Arc::clone(&receiver))); } ThreadPool { workers, sender } } pub fn execute<F>(&self, f: F) where F: FnOnce() + Send + 'static, { let job = Box::new(f); self.sender.send(job).unwrap(); } } impl Drop for ThreadPool { fn drop(&mut self) { for worker in self.workers.drain(..) { println!("Shutting down worker {}", worker.id); worker.thread.join().unwrap(); } } } struct Worker { id: usize, thread: thread::JoinHandle<()>, } impl Worker { fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker { let thread = thread::spawn(move || { loop { let job = receiver.lock().unwrap().recv().unwrap(); println!("Worker {id} got a job; executing."); job(); } }); Worker { id, thread } } } }
এটি কম্পাইলার এরর সমাধান করে এবং আমাদের কোডে অন্য কোনো পরিবর্তনের প্রয়োজন হয় না। লক্ষ্য করুন যে, যেহেতু প্যানিক করার সময় ড্রপ কল করা যেতে পারে, তাই unwrap
-ও প্যানিক করতে পারে এবং একটি ডাবল প্যানিক ঘটাতে পারে, যা প্রোগ্রামটি অবিলম্বে ক্র্যাশ করে এবং চলমান যেকোনো পরিচ্ছন্নতার কাজ শেষ করে দেয়। এটি একটি উদাহরণ প্রোগ্রামের জন্য ঠিক আছে, কিন্তু প্রোডাকশন কোডের জন্য সুপারিশ করা হয় না।
থ্রেডগুলোকে জব শোনা বন্ধ করার জন্য সংকেত দেওয়া
আমরা যে সমস্ত পরিবর্তন করেছি, তাতে আমাদের কোড কোনো ওয়ার্নিং ছাড়াই কম্পাইল হচ্ছে। যাইহোক, খারাপ খবর হলো এই কোডটি এখনও আমাদের কাঙ্ক্ষিত উপায়ে কাজ করছে না। মূল বিষয় হলো Worker
ইনস্ট্যান্সের থ্রেড দ্বারা চালিত ক্লোজারগুলোর লজিক: এই মুহূর্তে, আমরা join
কল করছি, কিন্তু এটি থ্রেডগুলোকে শাট ডাউন করবে না, কারণ তারা জব খোঁজার জন্য চিরতরে loop
করতে থাকে। যদি আমরা আমাদের drop
-এর বর্তমান ইমপ্লিমেন্টেশন দিয়ে আমাদের ThreadPool
ড্রপ করার চেষ্টা করি, তাহলে প্রধান থ্রেডটি চিরতরে ব্লক হয়ে যাবে, প্রথম থ্রেডটি শেষ হওয়ার জন্য অপেক্ষা করতে থাকবে।
এই সমস্যাটি সমাধান করার জন্য, আমাদের ThreadPool
drop
ইমপ্লিমেন্টেশনে একটি পরিবর্তন এবং তারপর Worker
লুপে একটি পরিবর্তন প্রয়োজন হবে।
প্রথমে আমরা ThreadPool
drop
ইমপ্লিমেন্টেশন পরিবর্তন করে থ্রেডগুলো শেষ হওয়ার জন্য অপেক্ষা করার আগে স্পষ্টভাবে sender
ড্রপ করব। লিস্টিং ২১-২৩ ThreadPool
-এ sender
-কে স্পষ্টভাবে ড্রপ করার পরিবর্তনগুলো দেখায়। থ্রেডের মতো নয়, এখানে আমাদের sender
-কে ThreadPool
থেকে Option::take
দিয়ে মুভ করার জন্য একটি Option
ব্যবহার করতে হবে।
use std::{
sync::{Arc, Mutex, mpsc},
thread,
};
pub struct ThreadPool {
workers: Vec<Worker>,
sender: Option<mpsc::Sender<Job>>,
}
// --snip--
type Job = Box<dyn FnOnce() + Send + 'static>;
impl ThreadPool {
/// Create a new ThreadPool.
///
/// The size is the number of threads in the pool.
///
/// # Panics
///
/// The `new` function will panic if the size is zero.
pub fn new(size: usize) -> ThreadPool {
// --snip--
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let receiver = Arc::new(Mutex::new(receiver));
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id, Arc::clone(&receiver)));
}
ThreadPool {
workers,
sender: Some(sender),
}
}
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
let job = Box::new(f);
self.sender.as_ref().unwrap().send(job).unwrap();
}
}
impl Drop for ThreadPool {
fn drop(&mut self) {
drop(self.sender.take());
for worker in self.workers.drain(..) {
println!("Shutting down worker {}", worker.id);
worker.thread.join().unwrap();
}
}
}
struct Worker {
id: usize,
thread: thread::JoinHandle<()>,
}
impl Worker {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
let thread = thread::spawn(move || {
loop {
let job = receiver.lock().unwrap().recv().unwrap();
println!("Worker {id} got a job; executing.");
job();
}
});
Worker { id, thread }
}
}
sender
ড্রপ করা চ্যানেলটি বন্ধ করে দেয়, যা নির্দেশ করে যে আর কোনো মেসেজ পাঠানো হবে না। যখন এটি ঘটে, তখন Worker
ইনস্ট্যান্সগুলো ইনফিনিট লুপে যে recv
কলগুলো করে, সেগুলি সব একটি এরর রিটার্ন করবে। লিস্টিং ২১-২৪-এ, আমরা Worker
লুপটি পরিবর্তন করি যাতে সেই ক্ষেত্রে লুপ থেকে সুন্দরভাবে প্রস্থান করা যায়, যার মানে হলো ThreadPool
drop
ইমপ্লিমেন্টেশন যখন তাদের উপর join
কল করবে তখন থ্রেডগুলো শেষ হয়ে যাবে।
use std::{
sync::{Arc, Mutex, mpsc},
thread,
};
pub struct ThreadPool {
workers: Vec<Worker>,
sender: Option<mpsc::Sender<Job>>,
}
type Job = Box<dyn FnOnce() + Send + 'static>;
impl ThreadPool {
/// Create a new ThreadPool.
///
/// The size is the number of threads in the pool.
///
/// # Panics
///
/// The `new` function will panic if the size is zero.
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let receiver = Arc::new(Mutex::new(receiver));
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id, Arc::clone(&receiver)));
}
ThreadPool {
workers,
sender: Some(sender),
}
}
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
let job = Box::new(f);
self.sender.as_ref().unwrap().send(job).unwrap();
}
}
impl Drop for ThreadPool {
fn drop(&mut self) {
drop(self.sender.take());
for worker in self.workers.drain(..) {
println!("Shutting down worker {}", worker.id);
worker.thread.join().unwrap();
}
}
}
struct Worker {
id: usize,
thread: thread::JoinHandle<()>,
}
impl Worker {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
let thread = thread::spawn(move || {
loop {
let message = receiver.lock().unwrap().recv();
match message {
Ok(job) => {
println!("Worker {id} got a job; executing.");
job();
}
Err(_) => {
println!("Worker {id} disconnected; shutting down.");
break;
}
}
}
});
Worker { id, thread }
}
}
এই কোডটি বাস্তবে দেখার জন্য, চলুন main
পরিবর্তন করি যাতে সার্ভারটি সুন্দরভাবে শাট ডাউন করার আগে মাত্র দুটি রিকোয়েস্ট গ্রহণ করে, যেমনটি লিস্টিং ২১-২৫ এ দেখানো হয়েছে।
use hello::ThreadPool;
use std::{
fs,
io::{BufReader, prelude::*},
net::{TcpListener, TcpStream},
thread,
time::Duration,
};
fn main() {
let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
let pool = ThreadPool::new(4);
for stream in listener.incoming().take(2) {
let stream = stream.unwrap();
pool.execute(|| {
handle_connection(stream);
});
}
println!("Shutting down.");
}
fn handle_connection(mut stream: TcpStream) {
let buf_reader = BufReader::new(&stream);
let request_line = buf_reader.lines().next().unwrap().unwrap();
let (status_line, filename) = match &request_line[..] {
"GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"),
"GET /sleep HTTP/1.1" => {
thread::sleep(Duration::from_secs(5));
("HTTP/1.1 200 OK", "hello.html")
}
_ => ("HTTP/1.1 404 NOT FOUND", "404.html"),
};
let contents = fs::read_to_string(filename).unwrap();
let length = contents.len();
let response =
format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}");
stream.write_all(response.as_bytes()).unwrap();
}
আপনি চাইবেন না যে একটি বাস্তব-বিশ্বের ওয়েব সার্ভার মাত্র দুটি রিকোয়েস্ট সার্ভ করার পর শাট ডাউন হয়ে যাক। এই কোডটি শুধু দেখাচ্ছে যে সুন্দরভাবে শাটডাউন এবং পরিচ্ছন্নতার প্রক্রিয়াটি সঠিকভাবে কাজ করছে।
take
মেথডটি Iterator
ট্রেইটে সংজ্ঞায়িত এবং এটি ইটারেশনকে সর্বোচ্চ প্রথম দুটি আইটেমে সীমাবদ্ধ করে। ThreadPool
main
-এর শেষে স্কোপের বাইরে চলে যাবে, এবং drop
ইমপ্লিমেন্টেশনটি চলবে।
cargo run
দিয়ে সার্ভারটি শুরু করুন, এবং তিনটি রিকোয়েস্ট করুন। তৃতীয় রিকোয়েস্টটি এরর দেওয়া উচিত, এবং আপনার টার্মিনালে আপনি এই ধরনের আউটপুট দেখতে পাবেন:
$ cargo run
Compiling hello v0.1.0 (file:///projects/hello)
Finished `dev` profile [unoptimized + debuginfo] target(s) in 0.41s
Running `target/debug/hello`
Worker 0 got a job; executing.
Shutting down.
Shutting down worker 0
Worker 3 got a job; executing.
Worker 1 disconnected; shutting down.
Worker 2 disconnected; shutting down.
Worker 3 disconnected; shutting down.
Worker 0 disconnected; shutting down.
Shutting down worker 1
Shutting down worker 2
Shutting down worker 3
আপনি হয়তো Worker
আইডি এবং প্রিন্ট করা মেসেজগুলোর একটি ভিন্ন ক্রম দেখতে পারেন। আমরা মেসেজগুলো থেকে দেখতে পাচ্ছি এই কোডটি কীভাবে কাজ করে: Worker
ইনস্ট্যান্স 0 এবং 3 প্রথম দুটি রিকোয়েস্ট পেয়েছে। দ্বিতীয় কানেকশনের পরে সার্ভার কানেকশন গ্রহণ করা বন্ধ করে দিয়েছে, এবং ThreadPool
-এর উপর Drop
ইমপ্লিমেন্টেশনটি Worker
3 তার জব শুরু করার আগেই চালানো শুরু হয়। sender
ড্রপ করা সমস্ত Worker
ইনস্ট্যান্সকে ডিসকানেক্ট করে এবং তাদের শাট ডাউন হতে বলে। Worker
ইনস্ট্যান্সগুলো প্রত্যেকেই ডিসকানেক্ট হওয়ার সময় একটি মেসেজ প্রিন্ট করে, এবং তারপর থ্রেড পুল প্রতিটি Worker
থ্রেড শেষ হওয়ার জন্য অপেক্ষা করতে join
কল করে।
এই নির্দিষ্ট এক্সিকিউশনের একটি আকর্ষণীয় দিক লক্ষ্য করুন: ThreadPool
sender
-কে ড্রপ করেছে, এবং কোনো Worker
এরর পাওয়ার আগেই, আমরা Worker
0 কে join
করার চেষ্টা করেছি। Worker
0 এখনও recv
থেকে কোনো এরর পায়নি, তাই প্রধান থ্রেডটি Worker
0 শেষ হওয়ার জন্য অপেক্ষা করতে ব্লক হয়ে গেছে। এই সময়ের মধ্যে, Worker
3 একটি জব পেয়েছে এবং তারপর সমস্ত থ্রেড একটি এরর পেয়েছে। যখন Worker
0 শেষ হয়েছে, প্রধান থ্রেডটি বাকি Worker
ইনস্ট্যান্সগুলো শেষ হওয়ার জন্য অপেক্ষা করেছে। সেই সময়ে, তারা সবাই তাদের লুপ থেকে বেরিয়ে গেছে এবং থেমে গেছে।
অভিনন্দন! আমরা এখন আমাদের প্রজেক্ট সম্পন্ন করেছি; আমাদের একটি বেসিক ওয়েব সার্ভার আছে যা অ্যাসিঙ্ক্রোনাসভাবে সাড়া দেওয়ার জন্য একটি থ্রেড পুল ব্যবহার করে। আমরা সার্ভারের একটি সুন্দর শাটডাউন সম্পাদন করতে সক্ষম, যা পুলের সমস্ত থ্রেড পরিচ্ছন্ন করে।
রেফারেন্সের জন্য এখানে সম্পূর্ণ কোড দেওয়া হলো:
use hello::ThreadPool;
use std::{
fs,
io::{BufReader, prelude::*},
net::{TcpListener, TcpStream},
thread,
time::Duration,
};
fn main() {
let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
let pool = ThreadPool::new(4);
for stream in listener.incoming().take(2) {
let stream = stream.unwrap();
pool.execute(|| {
handle_connection(stream);
});
}
println!("Shutting down.");
}
fn handle_connection(mut stream: TcpStream) {
let buf_reader = BufReader::new(&stream);
let request_line = buf_reader.lines().next().unwrap().unwrap();
let (status_line, filename) = match &request_line[..] {
"GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"),
"GET /sleep HTTP/1.1" => {
thread::sleep(Duration::from_secs(5));
("HTTP/1.1 200 OK", "hello.html")
}
_ => ("HTTP/1.1 404 NOT FOUND", "404.html"),
};
let contents = fs::read_to_string(filename).unwrap();
let length = contents.len();
let response =
format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}");
stream.write_all(response.as_bytes()).unwrap();
}
use std::{
sync::{Arc, Mutex, mpsc},
thread,
};
pub struct ThreadPool {
workers: Vec<Worker>,
sender: Option<mpsc::Sender<Job>>,
}
type Job = Box<dyn FnOnce() + Send + 'static>;
impl ThreadPool {
/// Create a new ThreadPool.
///
/// The size is the number of threads in the pool.
///
/// # Panics
///
/// The `new` function will panic if the size is zero.
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let receiver = Arc::new(Mutex::new(receiver));
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id, Arc::clone(&receiver)));
}
ThreadPool {
workers,
sender: Some(sender),
}
}
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
let job = Box::new(f);
self.sender.as_ref().unwrap().send(job).unwrap();
}
}
impl Drop for ThreadPool {
fn drop(&mut self) {
drop(self.sender.take());
for worker in &mut self.workers {
println!("Shutting down worker {}", worker.id);
if let Some(thread) = worker.thread.take() {
thread.join().unwrap();
}
}
}
}
struct Worker {
id: usize,
thread: Option<thread::JoinHandle<()>>,
}
impl Worker {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
let thread = thread::spawn(move || {
loop {
let message = receiver.lock().unwrap().recv();
match message {
Ok(job) => {
println!("Worker {id} got a job; executing.");
job();
}
Err(_) => {
println!("Worker {id} disconnected; shutting down.");
break;
}
}
}
});
Worker {
id,
thread: Some(thread),
}
}
}
আমরা এখানে আরও অনেক কিছু করতে পারি! আপনি যদি এই প্রজেক্টটি আরও উন্নত করতে চান, তবে এখানে কিছু ধারণা দেওয়া হলো:
ThreadPool
এবং এর পাবলিক মেথডগুলিতে আরও ডকুমেন্টেশন যোগ করুন।- লাইব্রেরির কার্যকারিতার জন্য টেস্ট যোগ করুন।
unwrap
কলগুলোকে আরও শক্তিশালী এরর হ্যান্ডলিং দিয়ে পরিবর্তন করুন।- ওয়েব রিকোয়েস্ট সার্ভ করা ছাড়া অন্য কোনো কাজ সম্পাদন করার জন্য
ThreadPool
ব্যবহার করুন। - crates.io তে একটি থ্রেড পুল ক্রেট খুঁজুন এবং সেই ক্রেট ব্যবহার করে একটি অনুরূপ ওয়েব সার্ভার ইমপ্লিমেন্ট করুন। তারপর এর API এবং শক্তিশালীতার সাথে আমাদের ইমপ্লিমেন্ট করা থ্রেড পুলের তুলনা করুন।
সারাংশ
চমৎকার! আপনি বইয়ের শেষ পর্যন্ত পৌঁছে গেছেন! Rust-এর এই সফরে আমাদের সাথে যোগ দেওয়ার জন্য আমরা আপনাকে ধন্যবাদ জানাতে চাই। আপনি এখন আপনার নিজের Rust প্রজেক্ট ইমপ্লিমেন্ট করতে এবং অন্য লোকের প্রজেক্টে সাহায্য করতে প্রস্তুত। মনে রাখবেন যে অন্যান্য Rustaceans-দের একটি স্বাগত জানানো কমিউনিটি রয়েছে যারা আপনার Rust যাত্রায় যেকোনো চ্যালেঞ্জের সম্মুখীন হলে আপনাকে সাহায্য করতে ভালোবাসবে।