Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

সুন্দরভাবে শাটডাউন এবং পরিচ্ছন্নতা

লিস্টিং ২১-২০ এর কোডটি আমাদের উদ্দেশ্য অনুযায়ী, একটি থ্রেড পুল ব্যবহার করে অ্যাসিঙ্ক্রোনাসভাবে রিকোয়েস্টের জবাব দিচ্ছে। আমরা workers, id, এবং thread ফিল্ডগুলো সরাসরি ব্যবহার করছি না বলে কিছু ওয়ার্নিং পাচ্ছি, যা আমাদের মনে করিয়ে দিচ্ছে যে আমরা কোনো কিছুই পরিচ্ছন্ন (cleanup) করছি না। যখন আমরা প্রধান থ্রেডটি বন্ধ করার জন্য কম মার্জিত ctrl-C` পদ্ধতি ব্যবহার করি, তখন অন্য সমস্ত থ্রেডও সঙ্গে সঙ্গে বন্ধ হয়ে যায়, এমনকি যদি তারা কোনো রিকোয়েস্ট সার্ভ করার মাঝখানেও থাকে।

এরপর, আমরা Drop ট্রেইট ইমপ্লিমেন্ট করব যাতে পুলের প্রতিটি থ্রেডের উপর join কল করা যায় এবং তারা বন্ধ হওয়ার আগে তাদের হাতে থাকা রিকোয়েস্টগুলোর কাজ শেষ করতে পারে। তারপর আমরা থ্রেডগুলোকে জানানোর জন্য একটি উপায় ইমপ্লিমেন্ট করব যে তাদের নতুন রিকোয়েস্ট গ্রহণ করা বন্ধ করে শাটডাউন করা উচিত। এই কোডটি বাস্তবে দেখার জন্য, আমরা আমাদের সার্ভারটিকে এমনভাবে পরিবর্তন করব যাতে এটি তার থ্রেড পুল সুন্দরভাবে শাটডাউন করার আগে মাত্র দুটি রিকোয়েস্ট গ্রহণ করে।

এগোনোর সময় একটি বিষয় লক্ষ্য করার মতো: এই পরিবর্তনগুলোর কোনোটিই সেই কোডের অংশকে প্রভাবিত করবে না যা ক্লোজার এক্সিকিউট করার কাজ করে, তাই আমরা যদি একটি async runtime-এর জন্য থ্রেড পুল ব্যবহার করতাম, তাহলেও সবকিছু একই থাকত।

ThreadPool-এর উপর Drop ট্রেইট ইমপ্লিমেন্ট করা

চলুন আমাদের থ্রেড পুলের উপর Drop ইমপ্লিমেন্ট করার মাধ্যমে শুরু করি। যখন পুলটি ড্রপ করা হবে, আমাদের সমস্ত থ্রেড join করা উচিত যাতে তারা তাদের কাজ শেষ করতে পারে। লিস্টিং ২১-২২-এ Drop ইমপ্লিমেন্টেশনের একটি প্রথম প্রচেষ্টা দেখানো হয়েছে; এই কোডটি এখনও ঠিকভাবে কাজ করবে না।

use std::{
    sync::{Arc, Mutex, mpsc},
    thread,
};

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: mpsc::Sender<Job>,
}

type Job = Box<dyn FnOnce() + Send + 'static>;

impl ThreadPool {
    /// Create a new ThreadPool.
    ///
    /// The size is the number of threads in the pool.
    ///
    /// # Panics
    ///
    /// The `new` function will panic if the size is zero.
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let (sender, receiver) = mpsc::channel();

        let receiver = Arc::new(Mutex::new(receiver));

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id, Arc::clone(&receiver)));
        }

        ThreadPool { workers, sender }
    }

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
        let job = Box::new(f);

        self.sender.send(job).unwrap();
    }
}

impl Drop for ThreadPool {
    fn drop(&mut self) {
        for worker in &mut self.workers {
            println!("Shutting down worker {}", worker.id);

            worker.thread.join().unwrap();
        }
    }
}

struct Worker {
    id: usize,
    thread: thread::JoinHandle<()>,
}

impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
        let thread = thread::spawn(move || {
            loop {
                let job = receiver.lock().unwrap().recv().unwrap();

                println!("Worker {id} got a job; executing.");

                job();
            }
        });

        Worker { id, thread }
    }
}

প্রথমে আমরা থ্রেড পুলের প্রতিটি workers-এর মধ্যে দিয়ে লুপ করি। আমরা এর জন্য &mut ব্যবহার করি কারণ self একটি মিউটেবল রেফারেন্স এবং আমাদের worker-কেও মিউটেট করতে হবে। প্রতিটি worker-এর জন্য, আমরা একটি বার্তা প্রিন্ট করি যে এই নির্দিষ্ট Worker ইনস্ট্যান্সটি শাট ডাউন হচ্ছে, এবং তারপর আমরা সেই Worker ইনস্ট্যান্সের থ্রেডে join কল করি। যদি join কল ব্যর্থ হয়, আমরা Rust-কে প্যানিক করতে এবং একটি অসুন্দর শাটডাউনে যেতে unwrap ব্যবহার করি।

এই কোডটি কম্পাইল করার সময় আমরা যে এররটি পাই তা এখানে দেওয়া হলো:

$ cargo check
    Checking hello v0.1.0 (file:///projects/hello)
error[E0507]: cannot move out of `worker.thread` which is behind a mutable reference
  --> src/lib.rs:52:13
   |
52 |             worker.thread.join().unwrap();
   |             ^^^^^^^^^^^^^ ------ `worker.thread` moved due to this method call
   |             |
   |             move occurs because `worker.thread` has type `JoinHandle<()>`, which does not implement the `Copy` trait
   |
note: `JoinHandle::<T>::join` takes ownership of the receiver `self`, which moves `worker.thread`
  --> /rustc/4eb161250e340c8f48f66e2b929ef4a5bed7c181/library/std/src/thread/mod.rs:1876:17

For more information about this error, try `rustc --explain E0507`.
error: could not compile `hello` (lib) due to 1 previous error

এররটি আমাদের বলছে যে আমরা join কল করতে পারছি না কারণ আমাদের প্রতিটি worker-এর শুধুমাত্র একটি মিউটেবল borrow আছে এবং join তার আর্গুমেন্টের মালিকানা নিয়ে নেয়। এই সমস্যাটি সমাধান করার জন্য, আমাদের thread টিকে Worker ইনস্ট্যান্স থেকে মুভ করতে হবে যা thread এর মালিক, যাতে join থ্রেডটিকে কনজিউম করতে পারে। এটি করার একটি উপায় হলো লিস্টিং ১৮-১৫ তে আমরা যে পদ্ধতিটি নিয়েছিলাম সেটি গ্রহণ করা। যদি Worker একটি Option<thread::JoinHandle<()>> ধারণ করত, আমরা Option-এর উপর take মেথড কল করতে পারতাম যাতে ভ্যালুটি Some ভ্যারিয়েন্ট থেকে মুভ করে তার জায়গায় একটি None ভ্যারিয়েন্ট রেখে দেওয়া যায়। অন্য কথায়, একটি চলমান Worker-এর thread-এ একটি Some ভ্যারিয়েন্ট থাকত এবং যখন আমরা একটি Worker-কে পরিচ্ছন্ন করতে চাইতাম, তখন আমরা Some-কে None দিয়ে প্রতিস্থাপন করতাম যাতে Worker-এর চালানোর জন্য কোনো থ্রেড না থাকে।

যাইহোক, এই পরিস্থিতিটি শুধুমাত্র Worker ড্রপ করার সময়ই আসত। এর বিনিময়ে, আমাদের worker.thread অ্যাক্সেস করার সময় সব জায়গায় একটি Option<thread::JoinHandle<()>> নিয়ে কাজ করতে হতো। ইডিওম্যাটিক Rust Option অনেক ব্যবহার করে, কিন্তু যখন আপনি নিজেকে এমন কিছুকে Option-এ র‍্যাপ করতে দেখেন যা আপনি জানেন সবসময় উপস্থিত থাকবে, শুধুমাত্র এই ধরনের একটি workaround হিসেবে, তখন আপনার কোডকে আরও পরিষ্কার এবং কম এরর-প্রোন করতে বিকল্প পদ্ধতির সন্ধান করা একটি ভাল ধারণা।

এই ক্ষেত্রে, একটি ভালো বিকল্প বিদ্যমান: Vec::drain মেথড। এটি একটি রেঞ্জ প্যারামিটার গ্রহণ করে যা ভেক্টর থেকে কোন আইটেমগুলো সরাতে হবে তা নির্দিষ্ট করে এবং সেই আইটেমগুলোর একটি ইটারেটর রিটার্ন করে। .. রেঞ্জ সিনট্যাক্স পাস করলে ভেক্টর থেকে প্রতিটি ভ্যালু সরিয়ে দেওয়া হবে।

তাই আমাদের ThreadPool drop ইমপ্লিমেন্টেশনটি এভাবে আপডেট করতে হবে:

#![allow(unused)]
fn main() {
use std::{
    sync::{Arc, Mutex, mpsc},
    thread,
};

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: mpsc::Sender<Job>,
}

type Job = Box<dyn FnOnce() + Send + 'static>;

impl ThreadPool {
    /// Create a new ThreadPool.
    ///
    /// The size is the number of threads in the pool.
    ///
    /// # Panics
    ///
    /// The `new` function will panic if the size is zero.
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let (sender, receiver) = mpsc::channel();

        let receiver = Arc::new(Mutex::new(receiver));

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id, Arc::clone(&receiver)));
        }

        ThreadPool { workers, sender }
    }

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
        let job = Box::new(f);

        self.sender.send(job).unwrap();
    }
}

impl Drop for ThreadPool {
    fn drop(&mut self) {
        for worker in self.workers.drain(..) {
            println!("Shutting down worker {}", worker.id);

            worker.thread.join().unwrap();
        }
    }
}

struct Worker {
    id: usize,
    thread: thread::JoinHandle<()>,
}

impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
        let thread = thread::spawn(move || {
            loop {
                let job = receiver.lock().unwrap().recv().unwrap();

                println!("Worker {id} got a job; executing.");

                job();
            }
        });

        Worker { id, thread }
    }
}
}

এটি কম্পাইলার এরর সমাধান করে এবং আমাদের কোডে অন্য কোনো পরিবর্তনের প্রয়োজন হয় না। লক্ষ্য করুন যে, যেহেতু প্যানিক করার সময় ড্রপ কল করা যেতে পারে, তাই unwrap-ও প্যানিক করতে পারে এবং একটি ডাবল প্যানিক ঘটাতে পারে, যা প্রোগ্রামটি অবিলম্বে ক্র্যাশ করে এবং চলমান যেকোনো পরিচ্ছন্নতার কাজ শেষ করে দেয়। এটি একটি উদাহরণ প্রোগ্রামের জন্য ঠিক আছে, কিন্তু প্রোডাকশন কোডের জন্য সুপারিশ করা হয় না।

থ্রেডগুলোকে জব শোনা বন্ধ করার জন্য সংকেত দেওয়া

আমরা যে সমস্ত পরিবর্তন করেছি, তাতে আমাদের কোড কোনো ওয়ার্নিং ছাড়াই কম্পাইল হচ্ছে। যাইহোক, খারাপ খবর হলো এই কোডটি এখনও আমাদের কাঙ্ক্ষিত উপায়ে কাজ করছে না। মূল বিষয় হলো Worker ইনস্ট্যান্সের থ্রেড দ্বারা চালিত ক্লোজারগুলোর লজিক: এই মুহূর্তে, আমরা join কল করছি, কিন্তু এটি থ্রেডগুলোকে শাট ডাউন করবে না, কারণ তারা জব খোঁজার জন্য চিরতরে loop করতে থাকে। যদি আমরা আমাদের drop-এর বর্তমান ইমপ্লিমেন্টেশন দিয়ে আমাদের ThreadPool ড্রপ করার চেষ্টা করি, তাহলে প্রধান থ্রেডটি চিরতরে ব্লক হয়ে যাবে, প্রথম থ্রেডটি শেষ হওয়ার জন্য অপেক্ষা করতে থাকবে।

এই সমস্যাটি সমাধান করার জন্য, আমাদের ThreadPool drop ইমপ্লিমেন্টেশনে একটি পরিবর্তন এবং তারপর Worker লুপে একটি পরিবর্তন প্রয়োজন হবে।

প্রথমে আমরা ThreadPool drop ইমপ্লিমেন্টেশন পরিবর্তন করে থ্রেডগুলো শেষ হওয়ার জন্য অপেক্ষা করার আগে স্পষ্টভাবে sender ড্রপ করব। লিস্টিং ২১-২৩ ThreadPool-এ sender-কে স্পষ্টভাবে ড্রপ করার পরিবর্তনগুলো দেখায়। থ্রেডের মতো নয়, এখানে আমাদের sender-কে ThreadPool থেকে Option::take দিয়ে মুভ করার জন্য একটি Option ব্যবহার করতে হবে

use std::{
    sync::{Arc, Mutex, mpsc},
    thread,
};

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: Option<mpsc::Sender<Job>>,
}
// --snip--

type Job = Box<dyn FnOnce() + Send + 'static>;

impl ThreadPool {
    /// Create a new ThreadPool.
    ///
    /// The size is the number of threads in the pool.
    ///
    /// # Panics
    ///
    /// The `new` function will panic if the size is zero.
    pub fn new(size: usize) -> ThreadPool {
        // --snip--

        assert!(size > 0);

        let (sender, receiver) = mpsc::channel();

        let receiver = Arc::new(Mutex::new(receiver));

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id, Arc::clone(&receiver)));
        }

        ThreadPool {
            workers,
            sender: Some(sender),
        }
    }

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
        let job = Box::new(f);

        self.sender.as_ref().unwrap().send(job).unwrap();
    }
}

impl Drop for ThreadPool {
    fn drop(&mut self) {
        drop(self.sender.take());

        for worker in self.workers.drain(..) {
            println!("Shutting down worker {}", worker.id);

            worker.thread.join().unwrap();
        }
    }
}

struct Worker {
    id: usize,
    thread: thread::JoinHandle<()>,
}

impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
        let thread = thread::spawn(move || {
            loop {
                let job = receiver.lock().unwrap().recv().unwrap();

                println!("Worker {id} got a job; executing.");

                job();
            }
        });

        Worker { id, thread }
    }
}

sender ড্রপ করা চ্যানেলটি বন্ধ করে দেয়, যা নির্দেশ করে যে আর কোনো মেসেজ পাঠানো হবে না। যখন এটি ঘটে, তখন Worker ইনস্ট্যান্সগুলো ইনফিনিট লুপে যে recv কলগুলো করে, সেগুলি সব একটি এরর রিটার্ন করবে। লিস্টিং ২১-২৪-এ, আমরা Worker লুপটি পরিবর্তন করি যাতে সেই ক্ষেত্রে লুপ থেকে সুন্দরভাবে প্রস্থান করা যায়, যার মানে হলো ThreadPool drop ইমপ্লিমেন্টেশন যখন তাদের উপর join কল করবে তখন থ্রেডগুলো শেষ হয়ে যাবে।

use std::{
    sync::{Arc, Mutex, mpsc},
    thread,
};

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: Option<mpsc::Sender<Job>>,
}

type Job = Box<dyn FnOnce() + Send + 'static>;

impl ThreadPool {
    /// Create a new ThreadPool.
    ///
    /// The size is the number of threads in the pool.
    ///
    /// # Panics
    ///
    /// The `new` function will panic if the size is zero.
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let (sender, receiver) = mpsc::channel();

        let receiver = Arc::new(Mutex::new(receiver));

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id, Arc::clone(&receiver)));
        }

        ThreadPool {
            workers,
            sender: Some(sender),
        }
    }

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
        let job = Box::new(f);

        self.sender.as_ref().unwrap().send(job).unwrap();
    }
}

impl Drop for ThreadPool {
    fn drop(&mut self) {
        drop(self.sender.take());

        for worker in self.workers.drain(..) {
            println!("Shutting down worker {}", worker.id);

            worker.thread.join().unwrap();
        }
    }
}

struct Worker {
    id: usize,
    thread: thread::JoinHandle<()>,
}

impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
        let thread = thread::spawn(move || {
            loop {
                let message = receiver.lock().unwrap().recv();

                match message {
                    Ok(job) => {
                        println!("Worker {id} got a job; executing.");

                        job();
                    }
                    Err(_) => {
                        println!("Worker {id} disconnected; shutting down.");
                        break;
                    }
                }
            }
        });

        Worker { id, thread }
    }
}

এই কোডটি বাস্তবে দেখার জন্য, চলুন main পরিবর্তন করি যাতে সার্ভারটি সুন্দরভাবে শাট ডাউন করার আগে মাত্র দুটি রিকোয়েস্ট গ্রহণ করে, যেমনটি লিস্টিং ২১-২৫ এ দেখানো হয়েছে।

use hello::ThreadPool;
use std::{
    fs,
    io::{BufReader, prelude::*},
    net::{TcpListener, TcpStream},
    thread,
    time::Duration,
};

fn main() {
    let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
    let pool = ThreadPool::new(4);

    for stream in listener.incoming().take(2) {
        let stream = stream.unwrap();

        pool.execute(|| {
            handle_connection(stream);
        });
    }

    println!("Shutting down.");
}

fn handle_connection(mut stream: TcpStream) {
    let buf_reader = BufReader::new(&stream);
    let request_line = buf_reader.lines().next().unwrap().unwrap();

    let (status_line, filename) = match &request_line[..] {
        "GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"),
        "GET /sleep HTTP/1.1" => {
            thread::sleep(Duration::from_secs(5));
            ("HTTP/1.1 200 OK", "hello.html")
        }
        _ => ("HTTP/1.1 404 NOT FOUND", "404.html"),
    };

    let contents = fs::read_to_string(filename).unwrap();
    let length = contents.len();

    let response =
        format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}");

    stream.write_all(response.as_bytes()).unwrap();
}

আপনি চাইবেন না যে একটি বাস্তব-বিশ্বের ওয়েব সার্ভার মাত্র দুটি রিকোয়েস্ট সার্ভ করার পর শাট ডাউন হয়ে যাক। এই কোডটি শুধু দেখাচ্ছে যে সুন্দরভাবে শাটডাউন এবং পরিচ্ছন্নতার প্রক্রিয়াটি সঠিকভাবে কাজ করছে।

take মেথডটি Iterator ট্রেইটে সংজ্ঞায়িত এবং এটি ইটারেশনকে সর্বোচ্চ প্রথম দুটি আইটেমে সীমাবদ্ধ করে। ThreadPool main-এর শেষে স্কোপের বাইরে চলে যাবে, এবং drop ইমপ্লিমেন্টেশনটি চলবে।

cargo run দিয়ে সার্ভারটি শুরু করুন, এবং তিনটি রিকোয়েস্ট করুন। তৃতীয় রিকোয়েস্টটি এরর দেওয়া উচিত, এবং আপনার টার্মিনালে আপনি এই ধরনের আউটপুট দেখতে পাবেন:

$ cargo run
   Compiling hello v0.1.0 (file:///projects/hello)
    Finished `dev` profile [unoptimized + debuginfo] target(s) in 0.41s
     Running `target/debug/hello`
Worker 0 got a job; executing.
Shutting down.
Shutting down worker 0
Worker 3 got a job; executing.
Worker 1 disconnected; shutting down.
Worker 2 disconnected; shutting down.
Worker 3 disconnected; shutting down.
Worker 0 disconnected; shutting down.
Shutting down worker 1
Shutting down worker 2
Shutting down worker 3

আপনি হয়তো Worker আইডি এবং প্রিন্ট করা মেসেজগুলোর একটি ভিন্ন ক্রম দেখতে পারেন। আমরা মেসেজগুলো থেকে দেখতে পাচ্ছি এই কোডটি কীভাবে কাজ করে: Worker ইনস্ট্যান্স 0 এবং 3 প্রথম দুটি রিকোয়েস্ট পেয়েছে। দ্বিতীয় কানেকশনের পরে সার্ভার কানেকশন গ্রহণ করা বন্ধ করে দিয়েছে, এবং ThreadPool-এর উপর Drop ইমপ্লিমেন্টেশনটি Worker 3 তার জব শুরু করার আগেই চালানো শুরু হয়। sender ড্রপ করা সমস্ত Worker ইনস্ট্যান্সকে ডিসকানেক্ট করে এবং তাদের শাট ডাউন হতে বলে। Worker ইনস্ট্যান্সগুলো প্রত্যেকেই ডিসকানেক্ট হওয়ার সময় একটি মেসেজ প্রিন্ট করে, এবং তারপর থ্রেড পুল প্রতিটি Worker থ্রেড শেষ হওয়ার জন্য অপেক্ষা করতে join কল করে।

এই নির্দিষ্ট এক্সিকিউশনের একটি আকর্ষণীয় দিক লক্ষ্য করুন: ThreadPool sender-কে ড্রপ করেছে, এবং কোনো Worker এরর পাওয়ার আগেই, আমরা Worker 0 কে join করার চেষ্টা করেছি। Worker 0 এখনও recv থেকে কোনো এরর পায়নি, তাই প্রধান থ্রেডটি Worker 0 শেষ হওয়ার জন্য অপেক্ষা করতে ব্লক হয়ে গেছে। এই সময়ের মধ্যে, Worker 3 একটি জব পেয়েছে এবং তারপর সমস্ত থ্রেড একটি এরর পেয়েছে। যখন Worker 0 শেষ হয়েছে, প্রধান থ্রেডটি বাকি Worker ইনস্ট্যান্সগুলো শেষ হওয়ার জন্য অপেক্ষা করেছে। সেই সময়ে, তারা সবাই তাদের লুপ থেকে বেরিয়ে গেছে এবং থেমে গেছে।

অভিনন্দন! আমরা এখন আমাদের প্রজেক্ট সম্পন্ন করেছি; আমাদের একটি বেসিক ওয়েব সার্ভার আছে যা অ্যাসিঙ্ক্রোনাসভাবে সাড়া দেওয়ার জন্য একটি থ্রেড পুল ব্যবহার করে। আমরা সার্ভারের একটি সুন্দর শাটডাউন সম্পাদন করতে সক্ষম, যা পুলের সমস্ত থ্রেড পরিচ্ছন্ন করে।

রেফারেন্সের জন্য এখানে সম্পূর্ণ কোড দেওয়া হলো:

use hello::ThreadPool;
use std::{
    fs,
    io::{BufReader, prelude::*},
    net::{TcpListener, TcpStream},
    thread,
    time::Duration,
};

fn main() {
    let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
    let pool = ThreadPool::new(4);

    for stream in listener.incoming().take(2) {
        let stream = stream.unwrap();

        pool.execute(|| {
            handle_connection(stream);
        });
    }

    println!("Shutting down.");
}

fn handle_connection(mut stream: TcpStream) {
    let buf_reader = BufReader::new(&stream);
    let request_line = buf_reader.lines().next().unwrap().unwrap();

    let (status_line, filename) = match &request_line[..] {
        "GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"),
        "GET /sleep HTTP/1.1" => {
            thread::sleep(Duration::from_secs(5));
            ("HTTP/1.1 200 OK", "hello.html")
        }
        _ => ("HTTP/1.1 404 NOT FOUND", "404.html"),
    };

    let contents = fs::read_to_string(filename).unwrap();
    let length = contents.len();

    let response =
        format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}");

    stream.write_all(response.as_bytes()).unwrap();
}
use std::{
    sync::{Arc, Mutex, mpsc},
    thread,
};

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: Option<mpsc::Sender<Job>>,
}

type Job = Box<dyn FnOnce() + Send + 'static>;

impl ThreadPool {
    /// Create a new ThreadPool.
    ///
    /// The size is the number of threads in the pool.
    ///
    /// # Panics
    ///
    /// The `new` function will panic if the size is zero.
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let (sender, receiver) = mpsc::channel();

        let receiver = Arc::new(Mutex::new(receiver));

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id, Arc::clone(&receiver)));
        }

        ThreadPool {
            workers,
            sender: Some(sender),
        }
    }

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
        let job = Box::new(f);

        self.sender.as_ref().unwrap().send(job).unwrap();
    }
}

impl Drop for ThreadPool {
    fn drop(&mut self) {
        drop(self.sender.take());

        for worker in &mut self.workers {
            println!("Shutting down worker {}", worker.id);

            if let Some(thread) = worker.thread.take() {
                thread.join().unwrap();
            }
        }
    }
}

struct Worker {
    id: usize,
    thread: Option<thread::JoinHandle<()>>,
}

impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
        let thread = thread::spawn(move || {
            loop {
                let message = receiver.lock().unwrap().recv();

                match message {
                    Ok(job) => {
                        println!("Worker {id} got a job; executing.");

                        job();
                    }
                    Err(_) => {
                        println!("Worker {id} disconnected; shutting down.");
                        break;
                    }
                }
            }
        });

        Worker {
            id,
            thread: Some(thread),
        }
    }
}

আমরা এখানে আরও অনেক কিছু করতে পারি! আপনি যদি এই প্রজেক্টটি আরও উন্নত করতে চান, তবে এখানে কিছু ধারণা দেওয়া হলো:

  • ThreadPool এবং এর পাবলিক মেথডগুলিতে আরও ডকুমেন্টেশন যোগ করুন।
  • লাইব্রেরির কার্যকারিতার জন্য টেস্ট যোগ করুন।
  • unwrap কলগুলোকে আরও শক্তিশালী এরর হ্যান্ডলিং দিয়ে পরিবর্তন করুন।
  • ওয়েব রিকোয়েস্ট সার্ভ করা ছাড়া অন্য কোনো কাজ সম্পাদন করার জন্য ThreadPool ব্যবহার করুন।
  • crates.io তে একটি থ্রেড পুল ক্রেট খুঁজুন এবং সেই ক্রেট ব্যবহার করে একটি অনুরূপ ওয়েব সার্ভার ইমপ্লিমেন্ট করুন। তারপর এর API এবং শক্তিশালীতার সাথে আমাদের ইমপ্লিমেন্ট করা থ্রেড পুলের তুলনা করুন।

সারাংশ

চমৎকার! আপনি বইয়ের শেষ পর্যন্ত পৌঁছে গেছেন! Rust-এর এই সফরে আমাদের সাথে যোগ দেওয়ার জন্য আমরা আপনাকে ধন্যবাদ জানাতে চাই। আপনি এখন আপনার নিজের Rust প্রজেক্ট ইমপ্লিমেন্ট করতে এবং অন্য লোকের প্রজেক্টে সাহায্য করতে প্রস্তুত। মনে রাখবেন যে অন্যান্য Rustaceans-দের একটি স্বাগত জানানো কমিউনিটি রয়েছে যারা আপনার Rust যাত্রায় যেকোনো চ্যালেঞ্জের সম্মুখীন হলে আপনাকে সাহায্য করতে ভালোবাসবে।