গ্রেসফুল শাটডাউন এবং ক্লিনআপ (Graceful Shutdown and Cleanup)

Listing 21-20-এর কোডটি থ্রেড পুল ব্যবহার করে অ্যাসিঙ্ক্রোনাসভাবে রিকোয়েস্টগুলির প্রতিক্রিয়া জানাচ্ছে, যেমনটি আমরা চেয়েছিলাম। আমরা workers, id, এবং thread ফিল্ড সম্পর্কে কিছু সতর্কতা পাচ্ছি যা আমরা সরাসরি ব্যবহার করছি না, যা আমাদের মনে করিয়ে দেয় যে আমরা কোনও কিছু পরিষ্কার করছি না। যখন আমরা main থ্রেড বন্ধ করার জন্য কম মার্জিত ctrl-c পদ্ধতি ব্যবহার করি, তখন অন্যান্য সমস্ত থ্রেডও অবিলম্বে বন্ধ হয়ে যায়, এমনকি যদি তারা কোনও রিকোয়েস্ট প্রক্রিয়া করার মাঝখানে থাকে তাহলেও।

এরপর, আমরা পুলের প্রতিটি থ্রেডে join কল করার জন্য Drop ট্রেইট ইমপ্লিমেন্ট করব যাতে সেগুলি বন্ধ হওয়ার আগে যে রিকোয়েস্টগুলিতে কাজ করছে সেগুলি শেষ করতে পারে। তারপর আমরা থ্রেডগুলিকে নতুন রিকোয়েস্ট গ্রহণ করা বন্ধ করতে এবং বন্ধ করার একটি উপায় ইমপ্লিমেন্ট করব। এই কোডটি কার্যকর অবস্থায় দেখতে, আমরা আমাদের সার্ভারকে পরিবর্তন করব যাতে এটি তার থ্রেড পুলটি সুন্দরভাবে বন্ধ করার আগে কেবল দুটি রিকোয়েস্ট গ্রহণ করে।

এখানে একটি লক্ষণীয় বিষয়: এগুলোর কোনোটিই কোডের সেই অংশগুলিকে প্রভাবিত করে না যা ক্লোজার এক্সিকিউট করে, তাই আমরা যদি অ্যাসিঙ্ক রানটাইমের জন্য একটি থ্রেড পুল ব্যবহার করতাম তাহলেও এখানে সবকিছু একই থাকত।

ThreadPool-এ Drop ট্রেইট ইমপ্লিমেন্ট করা (Implementing the DropTrait onThreadPool`)

আসুন আমাদের থ্রেড পুলে Drop ইমপ্লিমেন্ট করে শুরু করি। পুলটি ড্রপ করা হলে, আমাদের থ্রেডগুলি সব join করা উচিত যাতে তারা তাদের কাজ শেষ করতে পারে। Listing 21-22 Drop-এর একটি প্রথম প্রচেষ্টা দেখায়; এই কোডটি এখনও ঠিকঠাক কাজ করবে না।

use std::{
    sync::{Arc, Mutex, mpsc},
    thread,
};

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: mpsc::Sender<Job>,
}

type Job = Box<dyn FnOnce() + Send + 'static>;

impl ThreadPool {
    /// Create a new ThreadPool.
    ///
    /// The size is the number of threads in the pool.
    ///
    /// # Panics
    ///
    /// The `new` function will panic if the size is zero.
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let (sender, receiver) = mpsc::channel();

        let receiver = Arc::new(Mutex::new(receiver));

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id, Arc::clone(&receiver)));
        }

        ThreadPool { workers, sender }
    }

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
        let job = Box::new(f);

        self.sender.send(job).unwrap();
    }
}

impl Drop for ThreadPool {
    fn drop(&mut self) {
        for worker in &mut self.workers {
            println!("Shutting down worker {}", worker.id);

            worker.thread.join().unwrap();
        }
    }
}

struct Worker {
    id: usize,
    thread: thread::JoinHandle<()>,
}

impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
        let thread = thread::spawn(move || {
            loop {
                let job = receiver.lock().unwrap().recv().unwrap();

                println!("Worker {id} got a job; executing.");

                job();
            }
        });

        Worker { id, thread }
    }
}

প্রথমে, আমরা থ্রেড পুলের প্রতিটি workers-এর মধ্যে লুপ করি। আমরা এর জন্য &mut ব্যবহার করি কারণ self হল একটি মিউটেবল রেফারেন্স, এবং আমাদের worker-কেও মিউটেট করতে হবে। প্রতিটি কর্মীর জন্য, আমরা একটি মেসেজ প্রিন্ট করি যে এই নির্দিষ্ট Worker ইনস্ট্যান্সটি বন্ধ হচ্ছে এবং তারপর আমরা সেই Worker ইনস্ট্যান্সের থ্রেডে join কল করি। যদি join-এ কল ব্যর্থ হয়, তাহলে Rust-কে প্যানিক করতে এবং একটি অমার্জিত শাটডাউনে যেতে আমরা unwrap ব্যবহার করি।

আমরা যখন এই কোডটি কম্পাইল করি তখন এই error টি পাই:

$ cargo check
    Checking hello v0.1.0 (file:///projects/hello)
error[E0507]: cannot move out of `worker.thread` which is behind a mutable reference
    --> src/lib.rs:52:13
     |
52   |             worker.thread.join().unwrap();
     |             ^^^^^^^^^^^^^ ------ `worker.thread` moved due to this method call
     |             |
     |             move occurs because `worker.thread` has type `JoinHandle<()>`, which does not implement the `Copy` trait
     |
note: `JoinHandle::<T>::join` takes ownership of the receiver `self`, which moves `worker.thread`
    --> file:///home/.rustup/toolchains/1.85/lib/rustlib/src/rust/library/std/src/thread/mod.rs:1876:17
     |
1876 |     pub fn join(self) -> Result<T> {
     |                 ^^^^

For more information about this error, try `rustc --explain E0507`.
error: could not compile `hello` (lib) due to 1 previous error

Error টি আমাদের বলে যে আমরা join কল করতে পারি না কারণ আমাদের কাছে প্রতিটি worker-এর শুধুমাত্র একটি মিউটেবল বরো রয়েছে এবং join তার আর্গুমেন্টের ownership নেয়। এই সমস্যাটি সমাধান করার জন্য, আমাদের thread-কে Worker ইনস্ট্যান্স থেকে সরিয়ে নিতে হবে যা thread-এর মালিক, যাতে join থ্রেডটিকে ব্যবহার করতে পারে। Listing 18-15-এ আমরা যেমন করেছিলাম, তেমন একটি উপায় ব্যবহার করা যেতে পারে। যদি Worker একটি Option<thread::JoinHandle<()>> রাখত, তাহলে আমরা Option-এ take মেথড কল করে Some ভেরিয়েন্ট থেকে মানটি সরিয়ে নিতে পারতাম এবং এর জায়গায় একটি None ভেরিয়েন্ট রেখে যেতে পারতাম। অন্য কথায়, একটি Worker যেটি চলছে তার thread-এ একটি Some ভেরিয়েন্ট থাকবে এবং যখন আমরা একটি Worker পরিষ্কার করতে চাইব, তখন আমরা Some-কে None দিয়ে প্রতিস্থাপন করব যাতে Worker-এর চালানোর জন্য কোনও থ্রেড না থাকে।

যাইহোক, এটি শুধুমাত্র তখনই আসবে যখন Worker ড্রপ করা হবে। বিনিময়ে, আমরা যেখানেই worker.thread অ্যাক্সেস করি সেখানেই আমাদের একটি Option<thread::JoinHandle<()>> নিয়ে কাজ করতে হবে। ইডিওমেটিক রাস্ট Option বেশ কিছুটা ব্যবহার করে, কিন্তু আপনি যখন নিজেকে এমন কিছু র‍্যাপ করতে দেখবেন যা আপনি জানেন যে Option-এ সবসময় উপস্থিত থাকবে, তখন এটি একটি বিকল্প পদ্ধতির সন্ধান করার জন্য একটি ভাল ধারণা। সেগুলি আপনার কোডকে আরও পরিষ্কার এবং কম ত্রুটি-প্রবণ করে তুলতে পারে।

এই ক্ষেত্রে, একটি ভাল বিকল্প বিদ্যমান: Vec::drain মেথড। এটি Vec থেকে কোন আইটেমগুলি সরাতে হবে তা নির্দিষ্ট করার জন্য একটি রেঞ্জ প্যারামিটার গ্রহণ করে এবং সেই আইটেমগুলির একটি ইটারেটর রিটার্ন করে। .. রেঞ্জ সিনট্যাক্স পাস করলে Vec থেকে সমস্ত মান সরানো হবে।

সুতরাং আমাদের ThreadPool-এর drop ইমপ্লিমেন্টেশন এইভাবে আপডেট করতে হবে:

#![allow(unused)]
fn main() {
use std::{
    sync::{Arc, Mutex, mpsc},
    thread,
};

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: mpsc::Sender<Job>,
}

type Job = Box<dyn FnOnce() + Send + 'static>;

impl ThreadPool {
    /// Create a new ThreadPool.
    ///
    /// The size is the number of threads in the pool.
    ///
    /// # Panics
    ///
    /// The `new` function will panic if the size is zero.
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let (sender, receiver) = mpsc::channel();

        let receiver = Arc::new(Mutex::new(receiver));

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id, Arc::clone(&receiver)));
        }

        ThreadPool { workers, sender }
    }

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
        let job = Box::new(f);

        self.sender.send(job).unwrap();
    }
}

impl Drop for ThreadPool {
    fn drop(&mut self) {
        for worker in self.workers.drain(..) {
            println!("Shutting down worker {}", worker.id);

            worker.thread.join().unwrap();
        }
    }
}

struct Worker {
    id: usize,
    thread: thread::JoinHandle<()>,
}

impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
        let thread = thread::spawn(move || {
            loop {
                let job = receiver.lock().unwrap().recv().unwrap();

                println!("Worker {id} got a job; executing.");

                job();
            }
        });

        Worker { id, thread }
    }
}
}

এটি কম্পাইলার error সমাধান করে এবং আমাদের কোডে অন্য কোনও পরিবর্তনের প্রয়োজন হয় না।

থ্রেডগুলিকে সংকেত দেওয়া যাতে কাজের জন্য শোনা বন্ধ করা যায় (Signaling to the Threads to Stop Listening for Jobs)

আমরা যে সমস্ত পরিবর্তন করেছি, তাতে আমাদের কোড কোনও সতর্কতা ছাড়াই কম্পাইল হয়। যাইহোক, খারাপ খবর হল এই কোডটি এখনও আমাদের ইচ্ছামতো কাজ করে না। মূল বিষয় হল Worker ইনস্ট্যান্সগুলির থ্রেড দ্বারা চালানো ক্লোজারগুলির লজিক: এই মুহূর্তে, আমরা join কল করি, কিন্তু সেটি থ্রেডগুলিকে বন্ধ করবে না কারণ তারা কাজের জন্য চিরকাল loop করে। আমরা যদি আমাদের বর্তমান drop-এর ইমপ্লিমেন্টেশন দিয়ে আমাদের ThreadPool ড্রপ করার চেষ্টা করি, তাহলে main থ্রেড চিরকাল ব্লক হয়ে থাকবে, প্রথম থ্রেডটি শেষ হওয়ার জন্য অপেক্ষা করবে।

এই সমস্যাটি সমাধান করার জন্য, আমাদের ThreadPool-এর drop ইমপ্লিমেন্টেশনে একটি পরিবর্তন এবং তারপর Worker লুপে একটি পরিবর্তন প্রয়োজন।

প্রথমে আমরা থ্রেডগুলি শেষ হওয়ার জন্য অপেক্ষা করার আগে sender ড্রপ করার জন্য ThreadPool-এর drop ইমপ্লিমেন্টেশন পরিবর্তন করব। Listing 21-23 ThreadPool-এ sender কে স্পষ্টভাবে ড্রপ করার পরিবর্তনগুলি দেখায়। থ্রেডের বিপরীতে, এখানে আমাদের Option::take দিয়ে ThreadPool থেকে sender-কে সরানোর জন্য একটি Option ব্যবহার করতে হবে

use std::{
    sync::{Arc, Mutex, mpsc},
    thread,
};

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: Option<mpsc::Sender<Job>>,
}
// --snip--

type Job = Box<dyn FnOnce() + Send + 'static>;

impl ThreadPool {
    /// Create a new ThreadPool.
    ///
    /// The size is the number of threads in the pool.
    ///
    /// # Panics
    ///
    /// The `new` function will panic if the size is zero.
    pub fn new(size: usize) -> ThreadPool {
        // --snip--

        assert!(size > 0);

        let (sender, receiver) = mpsc::channel();

        let receiver = Arc::new(Mutex::new(receiver));

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id, Arc::clone(&receiver)));
        }

        ThreadPool {
            workers,
            sender: Some(sender),
        }
    }

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
        let job = Box::new(f);

        self.sender.as_ref().unwrap().send(job).unwrap();
    }
}

impl Drop for ThreadPool {
    fn drop(&mut self) {
        drop(self.sender.take());

        for worker in self.workers.drain(..) {
            println!("Shutting down worker {}", worker.id);

            worker.thread.join().unwrap();
        }
    }
}

struct Worker {
    id: usize,
    thread: thread::JoinHandle<()>,
}

impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
        let thread = thread::spawn(move || {
            loop {
                let job = receiver.lock().unwrap().recv().unwrap();

                println!("Worker {id} got a job; executing.");

                job();
            }
        });

        Worker { id, thread }
    }
}

sender ড্রপ করা চ্যানেলটি বন্ধ করে দেয়, যা নির্দেশ করে যে আর কোনও মেসেজ পাঠানো হবে না। যখন এটি ঘটে, তখন Worker ইনস্ট্যান্সগুলি অসীম লুপে যে সমস্ত recv কল করে সেগুলি একটি error রিটার্ন করবে। Listing 21-24-এ, আমরা সেই ক্ষেত্রে লুপ থেকে সুন্দরভাবে বেরিয়ে আসার জন্য Worker লুপ পরিবর্তন করি, যার মানে থ্রেডগুলি শেষ হবে যখন ThreadPool-এর drop ইমপ্লিমেন্টেশন তাদের উপর join কল করবে।

use std::{
    sync::{Arc, Mutex, mpsc},
    thread,
};

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: Option<mpsc::Sender<Job>>,
}

type Job = Box<dyn FnOnce() + Send + 'static>;

impl ThreadPool {
    /// Create a new ThreadPool.
    ///
    /// The size is the number of threads in the pool.
    ///
    /// # Panics
    ///
    /// The `new` function will panic if the size is zero.
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let (sender, receiver) = mpsc::channel();

        let receiver = Arc::new(Mutex::new(receiver));

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id, Arc::clone(&receiver)));
        }

        ThreadPool {
            workers,
            sender: Some(sender),
        }
    }

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
        let job = Box::new(f);

        self.sender.as_ref().unwrap().send(job).unwrap();
    }
}

impl Drop for ThreadPool {
    fn drop(&mut self) {
        drop(self.sender.take());

        for worker in self.workers.drain(..) {
            println!("Shutting down worker {}", worker.id);

            worker.thread.join().unwrap();
        }
    }
}

struct Worker {
    id: usize,
    thread: thread::JoinHandle<()>,
}

impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
        let thread = thread::spawn(move || {
            loop {
                let message = receiver.lock().unwrap().recv();

                match message {
                    Ok(job) => {
                        println!("Worker {id} got a job; executing.");

                        job();
                    }
                    Err(_) => {
                        println!("Worker {id} disconnected; shutting down.");
                        break;
                    }
                }
            }
        });

        Worker { id, thread }
    }
}

এই কোডটি কার্যকর অবস্থায় দেখতে, আসুন Listing 21-25-এ দেখানো মতো দুটি রিকোয়েস্ট পরিবেশন করার পরে সার্ভারটিকে সুন্দরভাবে বন্ধ করার জন্য main পরিবর্তন করি।

use hello::ThreadPool;
use std::{
    fs,
    io::{BufReader, prelude::*},
    net::{TcpListener, TcpStream},
    thread,
    time::Duration,
};

fn main() {
    let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
    let pool = ThreadPool::new(4);

    for stream in listener.incoming().take(2) {
        let stream = stream.unwrap();

        pool.execute(|| {
            handle_connection(stream);
        });
    }

    println!("Shutting down.");
}

fn handle_connection(mut stream: TcpStream) {
    let buf_reader = BufReader::new(&stream);
    let request_line = buf_reader.lines().next().unwrap().unwrap();

    let (status_line, filename) = match &request_line[..] {
        "GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"),
        "GET /sleep HTTP/1.1" => {
            thread::sleep(Duration::from_secs(5));
            ("HTTP/1.1 200 OK", "hello.html")
        }
        _ => ("HTTP/1.1 404 NOT FOUND", "404.html"),
    };

    let contents = fs::read_to_string(filename).unwrap();
    let length = contents.len();

    let response =
        format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}");

    stream.write_all(response.as_bytes()).unwrap();
}

আপনি একটি বাস্তব-বিশ্বের ওয়েব সার্ভারকে শুধুমাত্র দুটি রিকোয়েস্ট পরিবেশন করার পরে বন্ধ করতে চাইবেন না। এই কোডটি কেবল দেখায় যে গ্রেসফুল শাটডাউন এবং ক্লিনআপ কাজ করছে।

take মেথডটি Iterator ট্রেইটে সংজ্ঞায়িত করা হয়েছে এবং পুনরাবৃত্তিকে সর্বাধিক প্রথম দুটি আইটেমের মধ্যে সীমাবদ্ধ করে। main-এর শেষে ThreadPool স্কোপের বাইরে চলে যাবে এবং drop ইমপ্লিমেন্টেশন চলবে।

cargo run দিয়ে সার্ভার শুরু করুন এবং তিনটি রিকোয়েস্ট করুন। তৃতীয় রিকোয়েস্টটিতে error হওয়া উচিত এবং আপনার টার্মিনালে আপনি এইরকম আউটপুট দেখতে পাবেন:

$ cargo run
   Compiling hello v0.1.0 (file:///projects/hello)
    Finished `dev` profile [unoptimized + debuginfo] target(s) in 0.41s
     Running `target/debug/hello`
Worker 0 got a job; executing.
Shutting down.
Shutting down worker 0
Worker 3 got a job; executing.
Worker 1 disconnected; shutting down.
Worker 2 disconnected; shutting down.
Worker 3 disconnected; shutting down.
Worker 0 disconnected; shutting down.
Shutting down worker 1
Shutting down worker 2
Shutting down worker 3

আপনি Worker আইডি এবং প্রিন্ট করা মেসেজগুলির একটি ভিন্ন ক্রম দেখতে পারেন। আমরা দেখতে পাচ্ছি কিভাবে এই কোডটি মেসেজগুলি থেকে কাজ করে: Worker ইনস্ট্যান্স 0 এবং 3 প্রথম দুটি রিকোয়েস্ট পেয়েছে। সার্ভারটি দ্বিতীয় কানেকশনের পরে কানেকশন গ্রহণ করা বন্ধ করে দিয়েছে এবং Worker 3 এমনকি তার কাজ শুরু করার আগেই ThreadPool-এ Drop ইমপ্লিমেন্টেশন চলতে শুরু করে। sender ড্রপ করা সমস্ত Worker ইনস্ট্যান্সকে ডিসকানেক্ট করে এবং তাদের বন্ধ করতে বলে। Worker ইনস্ট্যান্সগুলি প্রত্যেকে ডিসকানেক্ট করার সময় একটি মেসেজ প্রিন্ট করে এবং তারপর থ্রেড পুল প্রতিটি Worker থ্রেড শেষ হওয়ার জন্য অপেক্ষা করতে join কল করে।

এই বিশেষ এক্সিকিউশনের একটি আকর্ষণীয় দিক লক্ষ্য করুন: ThreadPool sender-কে ড্রপ করেছে এবং কোনও Worker ত্রুটি পাওয়ার আগে, আমরা Worker 0-তে join করার চেষ্টা করেছি। Worker 0 এখনও recv থেকে কোনও ত্রুটি পায়নি, তাই main থ্রেড Worker 0 শেষ হওয়ার জন্য অপেক্ষা করতে ব্লক করে। ইতিমধ্যে, Worker 3 একটি কাজ পেয়েছে এবং তারপর সমস্ত থ্রেড একটি ত্রুটি পেয়েছে। যখন Worker 0 শেষ হয়, তখন main থ্রেড বাকি Worker ইনস্ট্যান্সগুলি শেষ হওয়ার জন্য অপেক্ষা করে। সেই সময়ে, তারা সবাই তাদের লুপ থেকে বেরিয়ে এসেছে এবং বন্ধ হয়ে গেছে।

অভিনন্দন! আমরা এখন আমাদের প্রোজেক্ট সম্পন্ন করেছি; আমাদের কাছে একটি বেসিক ওয়েব সার্ভার রয়েছে যা অ্যাসিঙ্ক্রোনাসভাবে প্রতিক্রিয়া জানাতে একটি থ্রেড পুল ব্যবহার করে। আমরা সার্ভারের একটি গ্রেসফুল শাটডাউন করতে সক্ষম, যা পুলের সমস্ত থ্রেড পরিষ্কার করে।

রেফারেন্সের জন্য এখানে সম্পূর্ণ কোড রয়েছে:

use hello::ThreadPool;
use std::{
    fs,
    io::{BufReader, prelude::*},
    net::{TcpListener, TcpStream},
    thread,
    time::Duration,
};

fn main() {
    let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
    let pool = ThreadPool::new(4);

    for stream in listener.incoming().take(2) {
        let stream = stream.unwrap();

        pool.execute(|| {
            handle_connection(stream);
        });
    }

    println!("Shutting down.");
}

fn handle_connection(mut stream: TcpStream) {
    let buf_reader = BufReader::new(&stream);
    let request_line = buf_reader.lines().next().unwrap().unwrap();

    let (status_line, filename) = match &request_line[..] {
        "GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"),
        "GET /sleep HTTP/1.1" => {
            thread::sleep(Duration::from_secs(5));
            ("HTTP/1.1 200 OK", "hello.html")
        }
        _ => ("HTTP/1.1 404 NOT FOUND", "404.html"),
    };

    let contents = fs::read_to_string(filename).unwrap();
    let length = contents.len();

    let response =
        format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}");

    stream.write_all(response.as_bytes()).unwrap();
}
use std::{
    sync::{Arc, Mutex, mpsc},
    thread,
};

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: Option<mpsc::Sender<Job>>,
}

type Job = Box<dyn FnOnce() + Send + 'static>;

impl ThreadPool {
    /// Create a new ThreadPool.
    ///
    /// The size is the number of threads in the pool.
    ///
    /// # Panics
    ///
    /// The `new` function will panic if the size is zero.
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let (sender, receiver) = mpsc::channel();

        let receiver = Arc::new(Mutex::new(receiver));

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id, Arc::clone(&receiver)));
        }

        ThreadPool {
            workers,
            sender: Some(sender),
        }
    }

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
        let job = Box::new(f);

        self.sender.as_ref().unwrap().send(job).unwrap();
    }
}

impl Drop for ThreadPool {
    fn drop(&mut self) {
        drop(self.sender.take());

        for worker in &mut self.workers {
            println!("Shutting down worker {}", worker.id);

            if let Some(thread) = worker.thread.take() {
                thread.join().unwrap();
            }
        }
    }
}

struct Worker {
    id: usize,
    thread: Option<thread::JoinHandle<()>>,
}

impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
        let thread = thread::spawn(move || {
            loop {
                let message = receiver.lock().unwrap().recv();

                match message {
                    Ok(job) => {
                        println!("Worker {id} got a job; executing.");

                        job();
                    }
                    Err(_) => {
                        println!("Worker {id} disconnected; shutting down.");
                        break;
                    }
                }
            }
        });

        Worker {
            id,
            thread: Some(thread),
        }
    }
}

আমরা এখানে আরও কিছু করতে পারি! আপনি যদি এই প্রোজেক্টটি উন্নত করতে চান তবে এখানে কিছু ধারণা রয়েছে:

  • ThreadPool এবং এর পাবলিক মেথডগুলিতে আরও ডকুমেন্টেশন যুক্ত করুন।
  • লাইব্রেরির কার্যকারিতার পরীক্ষা যুক্ত করুন।
  • unwrap কলগুলিকে আরও শক্তিশালী ত্রুটি হ্যান্ডলিংয়ে পরিবর্তন করুন।
  • ওয়েব রিকোয়েস্ট পরিবেশন করা ছাড়া অন্য কোনও কাজ সম্পাদন করতে ThreadPool ব্যবহার করুন।
  • crates.io-এ একটি থ্রেড পুল ক্রেট খুঁজুন এবং পরিবর্তে ক্রেট ব্যবহার করে একটি অনুরূপ ওয়েব সার্ভার ইমপ্লিমেন্ট করুন। তারপর এর API এবং দৃঢ়তা আমরা ইমপ্লিমেন্ট করা থ্রেড পুলের সাথে তুলনা করুন।

সারসংক্ষেপ (Summary)

সাবাশ! আপনি বইটির শেষে পৌঁছে গেছেন! Rust-এর এই সফরে আমাদের সাথে যোগ দেওয়ার জন্য আমরা আপনাকে ধন্যবাদ জানাতে চাই। আপনি এখন আপনার নিজের Rust প্রোজেক্টগুলি ইমপ্লিমেন্ট করতে এবং অন্য লোকেদের প্রোজেক্টে সাহায্য করতে প্রস্তুত। মনে রাখবেন যে অন্যান্য Rustaceans-দের একটি স্বাগত জানানোর মতো কমিউনিটি রয়েছে যারা আপনার Rust যাত্রায় আপনার সম্মুখীন হওয়া যেকোনো চ্যালেঞ্জে আপনাকে সাহায্য করতে পারলে খুশি হবে।