Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

আমাদের সিঙ্গেল-থ্রেডেড সার্ভারকে মাল্টিথ্রেডেড সার্ভারে রূপান্তরিত করা

এই মুহূর্তে, সার্ভারটি প্রতিটি রিকোয়েস্ট একে একে প্রসেস করবে, যার মানে হলো প্রথমটির প্রসেসিং শেষ না হওয়া পর্যন্ত এটি দ্বিতীয় কানেকশন প্রসেস করবে না। সার্ভারে যদি ক্রমাগত রিকোয়েস্ট আসতে থাকে, তাহলে এই সিরিয়াল এক্সিকিউশন পদ্ধতিটি তত কম কার্যকর হতে থাকবে। যদি সার্ভার এমন একটি রিকোয়েস্ট পায় যা প্রসেস করতে অনেক সময় লাগে, তবে পরবর্তী রিকোয়েস্টগুলোকে সেই দীর্ঘ রিকোয়েস্টটি শেষ না হওয়া পর্যন্ত অপেক্ষা করতে হবে, এমনকি যদি নতুন রিকোয়েস্টগুলো দ্রুত প্রসেস করা সম্ভবও হয়। আমাদের এই সমস্যার সমাধান করতে হবে, তবে প্রথমে আমরা সমস্যাটি বাস্তবে দেখব।

একটি ধীরগতির রিকোয়েস্ট সিমুলেট করা

আমরা দেখব কীভাবে একটি ধীরগতির রিকোয়েস্ট আমাদের বর্তমান সার্ভার ইমপ্লিমেন্টেশনে আসা অন্যান্য রিকোয়েস্টকে প্রভাবিত করতে পারে। লিস্টিং ২১-১০ এ /sleep পাথের জন্য একটি রিকোয়েস্ট হ্যান্ডেল করার কোড দেখানো হয়েছে, যেখানে একটি কৃত্রিম ধীরগতির রেসপন্স তৈরি করা হবে যা সার্ভারকে রেসপন্স পাঠানোর আগে পাঁচ সেকেন্ডের জন্য sleep করিয়ে দেবে।

use std::{
    fs,
    io::{BufReader, prelude::*},
    net::{TcpListener, TcpStream},
    thread,
    time::Duration,
};
// --snip--

fn main() {
    let listener = TcpListener::bind("127.0.0.1:7878").unwrap();

    for stream in listener.incoming() {
        let stream = stream.unwrap();

        handle_connection(stream);
    }
}

fn handle_connection(mut stream: TcpStream) {
    // --snip--

    let buf_reader = BufReader::new(&stream);
    let request_line = buf_reader.lines().next().unwrap().unwrap();

    let (status_line, filename) = match &request_line[..] {
        "GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"),
        "GET /sleep HTTP/1.1" => {
            thread::sleep(Duration::from_secs(5));
            ("HTTP/1.1 200 OK", "hello.html")
        }
        _ => ("HTTP/1.1 404 NOT FOUND", "404.html"),
    };

    // --snip--

    let contents = fs::read_to_string(filename).unwrap();
    let length = contents.len();

    let response =
        format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}");

    stream.write_all(response.as_bytes()).unwrap();
}

যেহেতু এখন আমাদের তিনটি কেস আছে, তাই আমরা if থেকে match-এ পরিবর্তিত হয়েছি। স্ট্রিং লিটারেলের সাথে প্যাটার্ন ম্যাচ করার জন্য আমাদের request_line-এর একটি স্লাইসের উপর স্পষ্টভাবে match করতে হবে; match ইক্যুয়ালিটি মেথডের মতো স্বয়ংক্রিয়ভাবে referencing এবং dereferencing করে না।

প্রথম arm-টি লিস্টিং ২১-৯ এর if ব্লকের মতোই। দ্বিতীয় arm-টি /sleep পাথের একটি রিকোয়েস্টের সাথে ম্যাচ করে। যখন সেই রিকোয়েস্টটি আসে, সার্ভার সফল HTML পেজটি রেন্ডার করার আগে পাঁচ সেকেন্ডের জন্য sleep করবে। তৃতীয় arm-টি লিস্টিং ২১-৯ এর else ব্লকের মতোই।

আপনি দেখতে পাচ্ছেন আমাদের সার্ভার কতটা প্রাথমিক পর্যায়ের: আসল লাইব্রেরিগুলো এর চেয়ে অনেক কম ভার্বোস উপায়ে একাধিক রিকোয়েস্ট শনাক্ত করতে পারত!

cargo run ব্যবহার করে সার্ভারটি শুরু করুন। তারপর দুটি ব্রাউজার উইন্ডো খুলুন: একটি http://127.0.0.1:7878 এর জন্য এবং অন্যটি http://127.0.0.1:7878/sleep এর জন্য। যদি আপনি আগের মতো কয়েকবার / URI টিতে যান, দেখবেন এটি দ্রুত সাড়া দিচ্ছে। কিন্তু যদি আপনি /sleep এ যান এবং তারপরে / লোড করেন, আপনি দেখবেন যে sleep এর পুরো পাঁচ সেকেন্ড শেষ না হওয়া পর্যন্ত / লোড হওয়ার জন্য অপেক্ষা করছে।

একটি ধীরগতির রিকোয়েস্টের কারণে অন্যান্য রিকোয়েস্টের জট এড়ানোর জন্য আমরা অনেক কৌশল ব্যবহার করতে পারি, যার মধ্যে অধ্যায় ১৭-তে ব্যবহৃত async একটি; আমরা এখানে যেটি ইমপ্লিমেন্ট করব সেটি হলো একটি থ্রেড পুল।

একটি থ্রেড পুল দিয়ে থ্রুপুট উন্নত করা

একটি thread pool হলো স্পন করা কিছু থ্রেডের একটি গ্রুপ, যা কোনো কাজ পরিচালনা করার জন্য প্রস্তুত থাকে এবং অপেক্ষা করে। যখন প্রোগ্রাম একটি নতুন টাস্ক পায়, তখন এটি পুলের একটি থ্রেডকে সেই টাস্কটি দেয় এবং সেই থ্রেড টাস্কটি প্রসেস করে। পুলের বাকি থ্রেডগুলো অন্য কোনো টাস্ক পরিচালনা করার জন্য প্রস্তুত থাকে যা প্রথম থ্রেডটির প্রসেসিং চলাকালীন আসে। যখন প্রথম থ্রেডটি তার টাস্ক প্রসেস করা শেষ করে, তখন এটি নিষ্ক্রিয় থ্রেডের পুলে ফিরে আসে, একটি নতুন টাস্ক পরিচালনা করার জন্য প্রস্তুত হয়ে। একটি থ্রেড পুল আপনাকে কানেকশনগুলো কনকারেন্টলি প্রসেস করতে দেয়, যা আপনার সার্ভারের থ্রুপুট বাড়িয়ে তোলে।

আমরা DoS আক্রমণ থেকে নিজেদের রক্ষা করার জন্য পুলের থ্রেডের সংখ্যা একটি ছোট সংখ্যায় সীমাবদ্ধ রাখব; যদি আমাদের প্রোগ্রাম প্রতিটি রিকোয়েস্ট আসার সাথে সাথে একটি নতুন থ্রেড তৈরি করত, তাহলে কেউ আমাদের সার্ভারে ১০ মিলিয়ন রিকোয়েস্ট পাঠিয়ে আমাদের সার্ভারের সমস্ত রিসোর্স ব্যবহার করে এবং রিকোয়েস্ট প্রসেসিং থামিয়ে দিয়ে বিশৃঙ্খলা সৃষ্টি করতে পারত।

সীমাহীন থ্রেড স্পন করার পরিবর্তে, আমাদের পুলে একটি নির্দিষ্ট সংখ্যক থ্রেড অপেক্ষায় থাকবে। আসা রিকোয়েস্টগুলো প্রসেসিংয়ের জন্য পুলে পাঠানো হয়। পুলটি ইনকামিং রিকোয়েস্টগুলোর একটি কিউ (queue) বজায় রাখবে। পুলের প্রতিটি থ্রেড এই কিউ থেকে একটি রিকোয়েস্ট তুলে নেবে, রিকোয়েস্টটি হ্যান্ডেল করবে, এবং তারপর আরেকটি রিকোয়েস্টের জন্য কিউকে জিজ্ঞাসা করবে। এই ডিজাইনের মাধ্যমে, আমরা একযোগে N টি পর্যন্ত রিকোয়েস্ট প্রসেস করতে পারি, যেখানে N হলো থ্রেডের সংখ্যা। যদি প্রতিটি থ্রেড একটি দীর্ঘ সময় ধরে চলা রিকোয়েস্টের জবাব দেয়, তবে পরবর্তী রিকোয়েস্টগুলো কিউতে আটকে যেতে পারে, কিন্তু আমরা সেই পর্যায়ে পৌঁছানোর আগে দীর্ঘ সময় ধরে চলা রিকোয়েস্টের সংখ্যা বাড়িয়ে দিয়েছি।

এই কৌশলটি একটি ওয়েব সার্ভারের থ্রুপুট উন্নত করার অনেক উপায়ের মধ্যে একটি মাত্র। অন্যান্য বিকল্প যা আপনি অন্বেষণ করতে পারেন তা হলো ফর্ক/জয়েন মডেল (fork/join model), সিঙ্গেল-থ্রেডেড অ্যাসিঙ্ক আই/ও মডেল (single-threaded async I/O model) এবং মাল্টি-থ্রেডেড অ্যাসিঙ্ক আই/ও মডেল (multithreaded async I/O model)। আপনি যদি এই বিষয়ে আগ্রহী হন, তবে আপনি অন্যান্য সমাধান সম্পর্কে আরও পড়তে পারেন এবং সেগুলি ইমপ্লিমেন্ট করার চেষ্টা করতে পারেন; Rust-এর মতো একটি লো-লেভেল ল্যাঙ্গুয়েজ দিয়ে, এই সমস্ত বিকল্পই সম্ভব।

আমরা একটি থ্রেড পুল ইমপ্লিমেন্ট করা শুরু করার আগে, চলুন আলোচনা করি যে পুলটি ব্যবহার করার প্রক্রিয়া কেমন হওয়া উচিত। যখন আপনি কোড ডিজাইন করার চেষ্টা করছেন, তখন ক্লায়েন্ট ইন্টারফেসটি প্রথমে লিখে নিলে তা আপনার ডিজাইনকে সঠিক পথে পরিচালিত করতে সাহায্য করে। কোডের API এমনভাবে লিখুন যেভাবে আপনি এটি কল করতে চান; তারপর সেই কাঠামোর মধ্যে কার্যকারিতা ইমপ্লিমেন্ট করুন, কার্যকারিতা ইমপ্লিমেন্ট করে তারপর পাবলিক API ডিজাইন করার পরিবর্তে।

অধ্যায় ১২-এর প্রজেক্টে আমরা যেমন টেস্ট-ড্রিভেন ডেভেলপমেন্ট ব্যবহার করেছি, তেমনি এখানে আমরা কম্পাইলার-ড্রিভেন ডেভেলপমেন্ট (compiler-driven development) ব্যবহার করব। আমরা যে ফাংশনগুলো কল করতে চাই, সেই কোডটি লিখব এবং তারপর কোডটি কাজ করানোর জন্য আমাদের পরবর্তী কী পরিবর্তন করা উচিত তা নির্ধারণ করতে কম্পাইলারের এররগুলো দেখব। তবে, তার আগে, আমরা যে কৌশলটি ব্যবহার করব না সেটি একটি সূচনা বিন্দু হিসাবে অন্বেষণ করব।

প্রতিটি রিকোয়েস্টের জন্য একটি থ্রেড স্পন করা

প্রথমে, চলুন দেখি আমাদের কোড কেমন হতে পারত যদি এটি প্রতিটি কানেকশনের জন্য একটি নতুন থ্রেড তৈরি করত। যেমন আগে উল্লেখ করা হয়েছে, এটি আমাদের চূড়ান্ত পরিকল্পনা নয় কারণ এতে সম্ভাব্য সীমাহীন সংখ্যক থ্রেড স্পন করার সমস্যা রয়েছে, তবে এটি একটি কার্যকর মাল্টিথ্রেডেড সার্ভার তৈরির জন্য একটি সূচনা বিন্দু। তারপর আমরা একটি উন্নতি হিসাবে থ্রেড পুল যোগ করব, এবং দুটি সমাধানের তুলনা করা সহজ হবে।

লিস্টিং ২১-১১ main ফাংশনে করা পরিবর্তনগুলো দেখায়, যেখানে for লুপের মধ্যে প্রতিটি স্ট্রিম হ্যান্ডেল করার জন্য একটি নতুন থ্রেড স্পন করা হয়েছে।

use std::{
    fs,
    io::{BufReader, prelude::*},
    net::{TcpListener, TcpStream},
    thread,
    time::Duration,
};

fn main() {
    let listener = TcpListener::bind("127.0.0.1:7878").unwrap();

    for stream in listener.incoming() {
        let stream = stream.unwrap();

        thread::spawn(|| {
            handle_connection(stream);
        });
    }
}

fn handle_connection(mut stream: TcpStream) {
    let buf_reader = BufReader::new(&stream);
    let request_line = buf_reader.lines().next().unwrap().unwrap();

    let (status_line, filename) = match &request_line[..] {
        "GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"),
        "GET /sleep HTTP/1.1" => {
            thread::sleep(Duration::from_secs(5));
            ("HTTP/1.1 200 OK", "hello.html")
        }
        _ => ("HTTP/1.1 404 NOT FOUND", "404.html"),
    };

    let contents = fs::read_to_string(filename).unwrap();
    let length = contents.len();

    let response =
        format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}");

    stream.write_all(response.as_bytes()).unwrap();
}

যেমন আপনি অধ্যায় ১৬-এ শিখেছেন, thread::spawn একটি নতুন থ্রেড তৈরি করবে এবং তারপর নতুন থ্রেডে ক্লোজারের কোডটি চালাবে। যদি আপনি এই কোডটি চালান এবং আপনার ব্রাউজারে /sleep লোড করেন, তারপর আরও দুটি ব্রাউজার ট্যাবে / লোড করেন, আপনি সত্যিই দেখবেন যে / এর রিকোয়েস্টগুলোকে /sleep শেষ হওয়ার জন্য অপেক্ষা করতে হবে না। তবে, যেমন আমরা উল্লেখ করেছি, এটি অবশেষে সিস্টেমকে অভিভূত করবে কারণ আপনি কোনো সীমা ছাড়াই নতুন থ্রেড তৈরি করছেন।

আপনার হয়তো অধ্যায় ১৭ থেকে মনে থাকতে পারে যে ঠিক এই ধরনের পরিস্থিতিতেই async এবং await সত্যিই অসাধারণ কাজ করে! আমরা যখন থ্রেড পুল তৈরি করব তখন এটি মনে রাখবেন এবং ভাববেন যে async এর সাথে জিনিসগুলো কীভাবে ভিন্ন বা একই রকম দেখতে হতো।

একটি সীমিত সংখ্যক থ্রেড তৈরি করা

আমরা চাই আমাদের থ্রেড পুল একটি একইরকম এবং পরিচিত উপায়ে কাজ করুক, যাতে থ্রেড থেকে থ্রেড পুলে স্যুইচ করার জন্য আমাদের API ব্যবহার করা কোডে বড় পরিবর্তন আনার প্রয়োজন না হয়। লিস্টিং ২১-১২ একটি ThreadPool struct-এর কাল্পনিক ইন্টারফেস দেখায় যা আমরা thread::spawn-এর পরিবর্তে ব্যবহার করতে চাই।

use std::{
    fs,
    io::{BufReader, prelude::*},
    net::{TcpListener, TcpStream},
    thread,
    time::Duration,
};

fn main() {
    let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
    let pool = ThreadPool::new(4);

    for stream in listener.incoming() {
        let stream = stream.unwrap();

        pool.execute(|| {
            handle_connection(stream);
        });
    }
}

fn handle_connection(mut stream: TcpStream) {
    let buf_reader = BufReader::new(&stream);
    let request_line = buf_reader.lines().next().unwrap().unwrap();

    let (status_line, filename) = match &request_line[..] {
        "GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"),
        "GET /sleep HTTP/1.1" => {
            thread::sleep(Duration::from_secs(5));
            ("HTTP/1.1 200 OK", "hello.html")
        }
        _ => ("HTTP/1.1 404 NOT FOUND", "404.html"),
    };

    let contents = fs::read_to_string(filename).unwrap();
    let length = contents.len();

    let response =
        format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}");

    stream.write_all(response.as_bytes()).unwrap();
}

আমরা ThreadPool::new ব্যবহার করে একটি কনফিগারযোগ্য সংখ্যক থ্রেডসহ একটি নতুন থ্রেড পুল তৈরি করি, এক্ষেত্রে চারটি। তারপর, for লুপে, pool.execute-এর thread::spawn-এর মতোই একটি ইন্টারফেস রয়েছে, কারণ এটি একটি ক্লোজার নেয় যা পুল প্রতিটি স্ট্রিমের জন্য চালাবে। আমাদের pool.execute ইমপ্লিমেন্ট করতে হবে যাতে এটি ক্লোজারটি নেয় এবং এটি চালানোর জন্য পুলের একটি থ্রেডকে দেয়। এই কোডটি এখনও কম্পাইল হবে না, তবে আমরা চেষ্টা করব যাতে কম্পাইলার আমাদের এটি ঠিক করার পথে গাইড করতে পারে।

কম্পাইলার-ড্রিভেন ডেভেলপমেন্ট ব্যবহার করে ThreadPool তৈরি করা

src/main.rs-এ লিস্টিং ২১-১২ এর পরিবর্তনগুলো করুন, এবং তারপর আমাদের ডেভেলপমেন্টকে চালিত করতে cargo check থেকে আসা কম্পাইলার এররগুলো ব্যবহার করি। এখানে প্রথম এররটি আমরা পাই:

$ cargo check
    Checking hello v0.1.0 (file:///projects/hello)
error[E0433]: failed to resolve: use of undeclared type `ThreadPool`
  --> src/main.rs:11:16
   |
11 |     let pool = ThreadPool::new(4);
   |                ^^^^^^^^^^ use of undeclared type `ThreadPool`

For more information about this error, try `rustc --explain E0433`.
error: could not compile `hello` (bin "hello") due to 1 previous error

চমৎকার! এই এররটি আমাদের বলছে যে আমাদের একটি ThreadPool টাইপ বা মডিউল প্রয়োজন, তাই আমরা এখন একটি তৈরি করব। আমাদের ThreadPool ইমপ্লিমেন্টেশন আমাদের ওয়েব সার্ভারের কাজের ধরনের থেকে স্বাধীন হবে। তাই চলুন, ThreadPool ইমপ্লিমেন্টেশন রাখার জন্য hello ক্রেটটিকে একটি বাইনারি ক্রেট থেকে একটি লাইব্রেরি ক্রেটে পরিবর্তন করি। লাইব্রেরি ক্রেটে পরিবর্তন করার পরে, আমরা আলাদা থ্রেড পুল লাইব্রেরিটি যেকোনো কাজের জন্য ব্যবহার করতে পারি, শুধু ওয়েব রিকোয়েস্ট সার্ভ করার জন্য নয়।

একটি src/lib.rs ফাইল তৈরি করুন যাতে নিম্নলিখিত কোডটি থাকে, যা এই মুহূর্তে আমাদের ThreadPool struct-এর সবচেয়ে সহজ সংজ্ঞা:

pub struct ThreadPool;

তারপর src/main.rs ফাইলের শীর্ষে নিম্নলিখিত কোডটি যোগ করে ThreadPool-কে লাইব্রেরি ক্রেট থেকে স্কোপে আনুন:

use hello::ThreadPool;
use std::{
    fs,
    io::{BufReader, prelude::*},
    net::{TcpListener, TcpStream},
    thread,
    time::Duration,
};

fn main() {
    let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
    let pool = ThreadPool::new(4);

    for stream in listener.incoming() {
        let stream = stream.unwrap();

        pool.execute(|| {
            handle_connection(stream);
        });
    }
}

fn handle_connection(mut stream: TcpStream) {
    let buf_reader = BufReader::new(&stream);
    let request_line = buf_reader.lines().next().unwrap().unwrap();

    let (status_line, filename) = match &request_line[..] {
        "GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"),
        "GET /sleep HTTP/1.1" => {
            thread::sleep(Duration::from_secs(5));
            ("HTTP/1.1 200 OK", "hello.html")
        }
        _ => ("HTTP/1.1 404 NOT FOUND", "404.html"),
    };

    let contents = fs::read_to_string(filename).unwrap();
    let length = contents.len();

    let response =
        format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}");

    stream.write_all(response.as_bytes()).unwrap();
}

এই কোডটি এখনও কাজ করবে না, কিন্তু চলুন পরবর্তী এররটি পেতে এটি আবার চেক করি যা আমাদের সমাধান করতে হবে:

$ cargo check
    Checking hello v0.1.0 (file:///projects/hello)
error[E0599]: no function or associated item named `new` found for struct `ThreadPool` in the current scope
  --> src/main.rs:12:28
   |
12 |     let pool = ThreadPool::new(4);
   |                            ^^^ function or associated item not found in `ThreadPool`

For more information about this error, try `rustc --explain E0599`.
error: could not compile `hello` (bin "hello") due to 1 previous error

এই এররটি নির্দেশ করে যে আমাদের ThreadPool-এর জন্য new নামে একটি অ্যাসোসিয়েটেড ফাংশন তৈরি করতে হবে। আমরা আরও জানি যে new-এর একটি প্যারামিটার থাকতে হবে যা 4 আর্গুমেন্ট হিসাবে গ্রহণ করতে পারে এবং একটি ThreadPool ইনস্ট্যান্স রিটার্ন করা উচিত। চলুন, এই বৈশিষ্ট্যগুলোসহ সবচেয়ে সহজ new ফাংশনটি ইমপ্লিমেন্ট করি:

pub struct ThreadPool;

impl ThreadPool {
    pub fn new(size: usize) -> ThreadPool {
        ThreadPool
    }
}

আমরা size প্যারামিটারের টাইপ হিসাবে usize বেছে নিয়েছি কারণ আমরা জানি যে একটি ঋণাত্মক সংখ্যক থ্রেড অর্থহীন। আমরা আরও জানি যে আমরা এই 4-কে থ্রেডের একটি কালেকশনের উপাদানের সংখ্যা হিসাবে ব্যবহার করব, যার জন্য usize টাইপটি ব্যবহৃত হয়, যেমনটি অধ্যায় ৩-এর "Integer Types" এ আলোচনা করা হয়েছে।

চলুন কোডটি আবার চেক করি:

$ cargo check
    Checking hello v0.1.0 (file:///projects/hello)
error[E0599]: no method named `execute` found for struct `ThreadPool` in the current scope
  --> src/main.rs:17:14
   |
17 |         pool.execute(|| {
   |         -----^^^^^^^ method not found in `ThreadPool`

For more information about this error, try `rustc --explain E0599`.
error: could not compile `hello` (bin "hello") due to 1 previous error```

এখন এররটি ঘটছে কারণ `ThreadPool`-এ আমাদের কোনো `execute` মেথড নেই। ["Creating a Finite Number of Threads"](#creating-a-finite-number-of-threads) থেকে মনে করুন যে আমরা সিদ্ধান্ত নিয়েছিলাম আমাদের থ্রেড পুলের `thread::spawn`-এর মতো একটি ইন্টারফেস থাকা উচিত। এছাড়াও, আমরা `execute` ফাংশনটি এমনভাবে ইমপ্লিমেন্ট করব যাতে এটি প্রদত্ত ক্লোজারটি নিয়ে পুলের একটি নিষ্ক্রিয় থ্রেডকে চালানোর জন্য দেয়।

আমরা `ThreadPool`-এ `execute` মেথডটিকে একটি প্যারামিটার হিসাবে একটি ক্লোজার নেওয়ার জন্য সংজ্ঞায়িত করব। অধ্যায় ১৩-এর ["Moving Captured Values Out of the Closure and the `Fn` Traits"][fn-traits] থেকে মনে করুন যে আমরা তিনটি ভিন্ন ট্রেইট দিয়ে ক্লোজারকে প্যারামিটার হিসাবে নিতে পারি: `Fn`, `FnMut`, এবং `FnOnce`। আমাদের এখানে কোন ধরনের ক্লোজার ব্যবহার করতে হবে তা সিদ্ধান্ত নিতে হবে। আমরা জানি যে আমরা শেষ পর্যন্ত স্ট্যান্ডার্ড লাইব্রেরি `thread::spawn` ইমপ্লিমেন্টেশনের মতো কিছু করব, তাই আমরা `thread::spawn`-এর স্বাক্ষরের প্যারামিটারে কী বাউন্ড রয়েছে তা দেখতে পারি। ডকুমেন্টেশন আমাদের নিম্নলিখিতটি দেখায়:

```rust,ignore
pub fn spawn<F, T>(f: F) -> JoinHandle<T>
    where
        F: FnOnce() -> T,
        F: Send + 'static,
        T: Send + 'static,

F টাইপ প্যারামিটারটি এখানে আমাদের উদ্বেগের বিষয়; T টাইপ প্যারামিটারটি রিটার্ন ভ্যালুর সাথে সম্পর্কিত, এবং আমরা এটি নিয়ে চিন্তিত নই। আমরা দেখতে পাচ্ছি যে spawn F-এর উপর ট্রেইট বাউন্ড হিসাবে FnOnce ব্যবহার করে। এটি সম্ভবত আমরাও চাই, কারণ আমরা অবশেষে execute-এ প্রাপ্ত আর্গুমেন্টটি spawn-এ পাস করব। আমরা আরও নিশ্চিত হতে পারি যে FnOnce-ই আমাদের কাঙ্ক্ষিত ট্রেইট, কারণ একটি রিকোয়েস্ট চালানোর জন্য থ্রেডটি সেই রিকোয়েস্টের ক্লোজারটি শুধুমাত্র একবারই এক্সিকিউট করবে, যা FnOnce-এর Once-এর সাথে মিলে যায়।

F টাইপ প্যারামিটারের Send ট্রেইট বাউন্ড এবং 'static লাইফটাইম বাউন্ডও রয়েছে, যা আমাদের পরিস্থিতিতে কার্যকর: আমাদের এক থ্রেড থেকে অন্য থ্রেডে ক্লোজার স্থানান্তর করতে Send প্রয়োজন এবং 'static প্রয়োজন কারণ আমরা জানি না থ্রেডটি এক্সিকিউট হতে কত সময় নেবে। চলুন ThreadPool-এ একটি execute মেথড তৈরি করি যা এই বাউন্ডগুলোসহ F টাইপের একটি জেনেরিক প্যারামিটার নেবে:

pub struct ThreadPool;

impl ThreadPool {
    // --snip--
    pub fn new(size: usize) -> ThreadPool {
        ThreadPool
    }

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
    }
}

আমরা এখনও FnOnce এর পরে () ব্যবহার করছি কারণ এই FnOnce একটি ক্লোজারকে প্রতিনিধিত্ব করে যা কোনো প্যারামিটার নেয় না এবং ইউনিট টাইপ () রিটার্ন করে। ফাংশন সংজ্ঞার মতোই, রিটার্ন টাইপটি স্বাক্ষর থেকে বাদ দেওয়া যেতে পারে, কিন্তু আমাদের কোনো প্যারামিটার না থাকলেও, আমাদের এখনও বন্ধনী প্রয়োজন।

আবারও, এটি execute মেথডের সবচেয়ে সহজ ইমপ্লিমেন্টেশন: এটি কিছুই করে না, কিন্তু আমরা শুধু আমাদের কোড কম্পাইল করার চেষ্টা করছি। চলুন এটি আবার চেক করি:

$ cargo check
    Checking hello v0.1.0 (file:///projects/hello)
    Finished `dev` profile [unoptimized + debuginfo] target(s) in 0.24s

এটি কম্পাইল হয়! কিন্তু লক্ষ্য করুন যে আপনি যদি cargo run চেষ্টা করেন এবং ব্রাউজারে একটি রিকোয়েস্ট করেন, তবে আপনি ব্রাউজারে অধ্যায়ের শুরুতে দেখা এররগুলো দেখতে পাবেন। আমাদের লাইব্রেরি আসলে execute-এ পাস করা ক্লোজারটি এখনও কল করছে না!

দ্রষ্টব্য: Haskell এবং Rust-এর মতো কঠোর কম্পাইলারসহ ল্যাঙ্গুয়েজ সম্পর্কে আপনি একটি কথা শুনতে পারেন: "যদি কোড কম্পাইল হয়, তবে এটি কাজ করে।" কিন্তু এই কথাটি সর্বজনীনভাবে সত্য নয়। আমাদের প্রজেক্ট কম্পাইল হচ্ছে, কিন্তু এটি बिल्कुल কিছুই করছে না! যদি আমরা একটি বাস্তব, সম্পূর্ণ প্রজেক্ট তৈরি করতাম, তবে এটি ইউনিট টেস্ট লেখা শুরু করার একটি ভাল সময় হতো যাতে কোড কম্পাইল হয় এবং আমাদের কাঙ্ক্ষিত আচরণ করে।

বিবেচনা করুন: যদি আমরা একটি ক্লোজারের পরিবর্তে একটি ফিউচার (future) এক্সিকিউট করতে যেতাম তাহলে এখানে কী ভিন্ন হতো?

new-তে থ্রেডের সংখ্যা যাচাই করা

আমরা new এবং execute-এর প্যারামিটার দিয়ে কিছুই করছি না। চলুন, আমাদের কাঙ্ক্ষিত আচরণসহ এই ফাংশনগুলোর বডি ইমপ্লিমেন্ট করি। শুরু করার জন্য, চলুন new সম্পর্কে ভাবি। আগে আমরা size প্যারামিটারের জন্য একটি আনসাইন্ড টাইপ বেছে নিয়েছিলাম কারণ ঋণাত্মক সংখ্যক থ্রেডসহ একটি পুল অর্থহীন। তবে, শূন্য থ্রেডসহ একটি পুলও অর্থহীন, তবুও শূন্য একটি পুরোপুরি বৈধ usize। আমরা কোড যোগ করব যা ThreadPool ইনস্ট্যান্স রিটার্ন করার আগে size শূন্যের চেয়ে বড় কিনা তা পরীক্ষা করবে এবং যদি শূন্য পায় তবে assert! ম্যাক্রো ব্যবহার করে প্রোগ্রামটি প্যানিক করাবে, যেমনটি লিস্টিং ২১-১৩ এ দেখানো হয়েছে।

pub struct ThreadPool;

impl ThreadPool {
    /// Create a new ThreadPool.
    ///
    /// The size is the number of threads in the pool.
    ///
    /// # Panics
    ///
    /// The `new` function will panic if the size is zero.
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        ThreadPool
    }

    // --snip--
    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
    }
}

আমরা ডক কমেন্ট দিয়ে আমাদের ThreadPool-এর জন্য কিছু ডকুমেন্টেশনও যোগ করেছি। লক্ষ্য করুন যে আমরা অধ্যায় ১৪-তে আলোচনা করা অনুযায়ী ভাল ডকুমেন্টেশন পদ্ধতি অনুসরণ করেছি এবং এমন একটি বিভাগ যোগ করেছি যা সেই পরিস্থিতিগুলো উল্লেখ করে যেখানে আমাদের ফাংশন প্যানিক করতে পারে। cargo doc --open চালান এবং ThreadPool struct-এ ক্লিক করে দেখুন new-এর জন্য জেনারেট করা ডক্স কেমন দেখায়!

এখানে assert! ম্যাক্রো যোগ করার পরিবর্তে, আমরা new-কে build-এ পরিবর্তন করতে পারতাম এবং I/O প্রজেক্টে লিস্টিং ১২-৯-এর Config::build-এর মতো একটি Result রিটার্ন করতে পারতাম। কিন্তু আমরা এই ক্ষেত্রে সিদ্ধান্ত নিয়েছি যে কোনো থ্রেড ছাড়াই একটি থ্রেড পুল তৈরি করার চেষ্টা একটি ناقابل পুনরুদ্ধারযোগ্য এরর হওয়া উচিত। আপনি যদি উচ্চাকাঙ্ক্ষী হন, তবে new ফাংশনের সাথে তুলনা করার জন্য নিম্নলিখিত স্বাক্ষরসহ build নামে একটি ফাংশন লেখার চেষ্টা করুন:

pub fn build(size: usize) -> Result<ThreadPool, PoolCreationError> {

থ্রেড সংরক্ষণ করার জন্য স্থান তৈরি করা

এখন যেহেতু আমরা জানি যে পুলে সংরক্ষণ করার জন্য আমাদের কাছে একটি বৈধ সংখ্যক থ্রেড আছে, আমরা সেই থ্রেডগুলো তৈরি করতে পারি এবং ThreadPool struct-এ সংরক্ষণ করতে পারি structটি রিটার্ন করার আগে। কিন্তু আমরা কীভাবে একটি থ্রেড "সংরক্ষণ" করব? চলুন thread::spawn এর স্বাক্ষরটি আরেকবার দেখি:

pub fn spawn<F, T>(f: F) -> JoinHandle<T>
    where
        F: FnOnce() -> T,
        F: Send + 'static,
        T: Send + 'static,

spawn ফাংশনটি একটি JoinHandle<T> রিটার্ন করে, যেখানে T হলো সেই টাইপ যা ক্লোজারটি রিটার্ন করে। চলুন JoinHandle ব্যবহার করে দেখি কী হয়। আমাদের ক্ষেত্রে, আমরা যে ক্লোজারগুলো থ্রেড পুলে পাস করছি সেগুলি কানেকশন হ্যান্ডেল করবে এবং কিছু রিটার্ন করবে না, তাই T হবে ইউনিট টাইপ ()

লিস্টিং ২১-১৪ এর কোডটি কম্পাইল হবে কিন্তু এখনও কোনো থ্রেড তৈরি করবে না। আমরা ThreadPool-এর সংজ্ঞা পরিবর্তন করে thread::JoinHandle<()> ইনস্ট্যান্সের একটি ভেক্টর ধারণ করার ব্যবস্থা করেছি, size ক্যাপাসিটিসহ ভেক্টরটি ইনিশিয়ালাইজ করেছি, একটি for লুপ সেট আপ করেছি যা থ্রেড তৈরির জন্য কিছু কোড চালাবে, এবং তাদের ধারণকারী একটি ThreadPool ইনস্ট্যান্স রিটার্ন করেছি।

use std::thread;

pub struct ThreadPool {
    threads: Vec<thread::JoinHandle<()>>,
}

impl ThreadPool {
    // --snip--
    /// Create a new ThreadPool.
    ///
    /// The size is the number of threads in the pool.
    ///
    /// # Panics
    ///
    /// The `new` function will panic if the size is zero.
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let mut threads = Vec::with_capacity(size);

        for _ in 0..size {
            // create some threads and store them in the vector
        }

        ThreadPool { threads }
    }
    // --snip--

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
    }
}

আমরা লাইব্রেরি ক্রেটে std::thread-কে স্কোপে নিয়ে এসেছি কারণ আমরা ThreadPool-এর ভেক্টরের আইটেমগুলোর টাইপ হিসাবে thread::JoinHandle ব্যবহার করছি।

একবার একটি বৈধ সাইজ পেলে, আমাদের ThreadPool একটি নতুন ভেক্টর তৈরি করে যা size সংখ্যক আইটেম ধারণ করতে পারে। with_capacity ফাংশনটি Vec::new-এর মতোই কাজ করে কিন্তু একটি গুরুত্বপূর্ণ পার্থক্যসহ: এটি ভেক্টরে স্থান পূর্ব-বরাদ্দ করে। যেহেতু আমরা জানি যে আমাদের ভেক্টরে size সংখ্যক এলিমেন্ট সংরক্ষণ করতে হবে, তাই এই বরাদ্দটি আগে থেকে করা Vec::new ব্যবহার করার চেয়ে কিছুটা বেশি কার্যকর, যা এলিমেন্ট প্রবেশ করানোর সাথে সাথে নিজেকে রিসাইজ করে।

যখন আপনি cargo check আবার চালাবেন, এটি সফল হওয়া উচিত।

ThreadPool থেকে একটি থ্রেডে কোড পাঠানো

আমরা লিস্টিং ২১-১৪ এর for লুপে থ্রেড তৈরির বিষয়ে একটি কমেন্ট রেখেছিলাম। এখানে, আমরা দেখব কীভাবে আমরা আসলে থ্রেড তৈরি করি। স্ট্যান্ডার্ড লাইব্রেরি থ্রেড তৈরির একটি উপায় হিসাবে thread::spawn প্রদান করে, এবং thread::spawn আশা করে যে থ্রেডটি তৈরি হওয়ার সাথে সাথেই চালানোর জন্য কিছু কোড পাবে। তবে, আমাদের ক্ষেত্রে, আমরা থ্রেডগুলো তৈরি করতে এবং তাদের এমন কোডের জন্য অপেক্ষা করতে চাই যা আমরা পরে পাঠাব। স্ট্যান্ডার্ড লাইব্রেরির থ্রেড ইমপ্লিমেন্টেশনে এটি করার কোনো উপায় অন্তর্ভুক্ত নেই; আমাদের এটি ম্যানুয়ালি ইমপ্লিমেন্ট করতে হবে।

আমরা ThreadPool এবং থ্রেডগুলোর মধ্যে একটি নতুন ডেটা স্ট্রাকচার চালু করে এই আচরণটি ইমপ্লিমেন্ট করব যা এই নতুন আচরণটি পরিচালনা করবে। আমরা এই ডেটা স্ট্রাকচারটিকে Worker বলব, যা পুলিং ইমপ্লিমেন্টেশনে একটি সাধারণ পরিভাষা। Worker চালানোর জন্য প্রয়োজনীয় কোড তুলে নেয় এবং সেই কোডটি তার থ্রেডে চালায়।

একটি রেস্তোরাঁর রান্নাঘরে কাজ করা লোকদের কথা ভাবুন: কর্মীরা গ্রাহকদের কাছ থেকে অর্ডার আসা পর্যন্ত অপেক্ষা করে, এবং তারপর তারা সেই অর্ডারগুলো নিয়ে সেগুলি পূরণ করার জন্য দায়ী।

থ্রেড পুলে JoinHandle<()> ইনস্ট্যান্সের একটি ভেক্টর সংরক্ষণ করার পরিবর্তে, আমরা Worker struct-এর ইনস্ট্যান্স সংরক্ষণ করব। প্রতিটি Worker একটি একক JoinHandle<()> ইনস্ট্যান্স সংরক্ষণ করবে। তারপর আমরা Worker-এর উপর একটি মেথড ইমপ্লিমেন্ট করব যা চালানোর জন্য একটি ক্লোজার নেবে এবং এটি এক্সিকিউশনের জন্য ইতিমধ্যে চলমান থ্রেডে পাঠাবে। আমরা প্রতিটি Worker-কে একটি id দেব যাতে আমরা লগিং বা ডিবাগিং করার সময় পুলের বিভিন্ন Worker ইনস্ট্যান্সের মধ্যে পার্থক্য করতে পারি।

এখানে নতুন প্রক্রিয়াটি রয়েছে যা আমরা একটি ThreadPool তৈরি করার সময় ঘটবে। Worker-কে এভাবে সেট আপ করার পরে আমরা ক্লোজারটি থ্রেডে পাঠানোর কোডটি ইমপ্লিমেন্ট করব:

  1. একটি Worker struct সংজ্ঞায়িত করুন যা একটি id এবং একটি JoinHandle<()> ধারণ করে।
  2. ThreadPool-কে Worker ইনস্ট্যান্সের একটি ভেক্টর ধারণ করার জন্য পরিবর্তন করুন।
  3. একটি Worker::new ফাংশন সংজ্ঞায়িত করুন যা একটি id নম্বর নেয় এবং একটি Worker ইনস্ট্যান্স রিটার্ন করে যা id এবং একটি খালি ক্লোজার দিয়ে স্পন করা একটি থ্রেড ধারণ করে।
  4. ThreadPool::new-এ, for লুপ কাউন্টার ব্যবহার করে একটি id তৈরি করুন, সেই id দিয়ে একটি নতুন Worker তৈরি করুন, এবং Worker-টিকে ভেক্টরে সংরক্ষণ করুন।

আপনি যদি একটি চ্যালেঞ্জের জন্য প্রস্তুত হন, তবে লিস্টিং ২১-১৫ এর কোডটি দেখার আগে নিজে এই পরিবর্তনগুলি ইমপ্লিমেন্ট করার চেষ্টা করুন।

প্রস্তুত? এখানে লিস্টিং ২১-১৫ রয়েছে যা পূর্ববর্তী পরিবর্তনগুলি করার একটি উপায় দেখায়।

use std::thread;

pub struct ThreadPool {
    workers: Vec<Worker>,
}

impl ThreadPool {
    // --snip--
    /// Create a new ThreadPool.
    ///
    /// The size is the number of threads in the pool.
    ///
    /// # Panics
    ///
    /// The `new` function will panic if the size is zero.
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id));
        }

        ThreadPool { workers }
    }
    // --snip--

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
    }
}

struct Worker {
    id: usize,
    thread: thread::JoinHandle<()>,
}

impl Worker {
    fn new(id: usize) -> Worker {
        let thread = thread::spawn(|| {});

        Worker { id, thread }
    }
}

আমরা ThreadPool-এর ফিল্ডের নাম threads থেকে workers-এ পরিবর্তন করেছি কারণ এটি এখন JoinHandle<()> ইনস্ট্যান্সের পরিবর্তে Worker ইনস্ট্যান্স ধারণ করছে। আমরা for লুপের কাউন্টারটিকে Worker::new-এর আর্গুমেন্ট হিসাবে ব্যবহার করি, এবং আমরা প্রতিটি নতুন Worker-কে workers নামের ভেক্টরে সংরক্ষণ করি।

বাহ্যিক কোড (যেমন src/main.rs-এ আমাদের সার্ভার) ThreadPool-এর মধ্যে একটি Worker struct ব্যবহারের বাস্তবায়ন বিবরণ জানার প্রয়োজন নেই, তাই আমরা Worker struct এবং তার new ফাংশনটিকে প্রাইভেট করে দিই। Worker::new ফাংশনটি আমাদের দেওয়া id ব্যবহার করে এবং একটি JoinHandle<()> ইনস্ট্যান্স সংরক্ষণ করে যা একটি খালি ক্লোজার ব্যবহার করে একটি নতুন থ্রেড স্পন করে তৈরি করা হয়।

দ্রষ্টব্য: যদি অপারেটিং সিস্টেম পর্যাপ্ত সিস্টেম রিসোর্স না থাকার কারণে একটি থ্রেড তৈরি করতে না পারে, thread::spawn প্যানিক করবে। এটি আমাদের পুরো সার্ভারটিকে প্যানিক করাবে, যদিও কিছু থ্রেড তৈরি সফল হতে পারে। সরলতার জন্য, এই আচরণটি ঠিক আছে, কিন্তু একটি প্রোডাকশন থ্রেড পুল ইমপ্লিমেন্টেশনে, আপনি সম্ভবত std::thread::Builder এবং তার spawn মেথড ব্যবহার করতে চাইবেন যা Result রিটার্ন করে।

এই কোডটি কম্পাইল হবে এবং ThreadPool::new-এর আর্গুমেন্ট হিসাবে নির্দিষ্ট করা Worker ইনস্ট্যান্সের সংখ্যা সংরক্ষণ করবে। কিন্তু আমরা এখনও execute-এ পাওয়া ক্লোজারটি প্রসেস করছি না। চলুন দেখি পরবর্তীতে এটি কীভাবে করা যায়।

চ্যানেলগুলির মাধ্যমে থ্রেডগুলিতে রিকোয়েস্ট পাঠানো

পরবর্তী সমস্যাটি হলো thread::spawn-কে দেওয়া ক্লোজারগুলো কিছুই করে না। বর্তমানে, আমরা execute মেথডে যে ক্লোজারটি এক্সিকিউট করতে চাই তা পাই। কিন্তু ThreadPool তৈরির সময় প্রতিটি Worker তৈরি করার সময় আমাদের thread::spawn-কে চালানোর জন্য একটি ক্লোজার দিতে হবে।

আমরা চাই যে আমরা এইমাত্র যে Worker struct গুলো তৈরি করেছি সেগুলি ThreadPool-এ রাখা একটি কিউ থেকে চালানোর জন্য কোড আনুক এবং সেই কোডটি চালানোর জন্য তার থ্রেডে পাঠাক।

অধ্যায় ১৬-তে শেখা চ্যানেলগুলো—দুটি থ্রেডের মধ্যে যোগাযোগের একটি সহজ উপায়—এই ব্যবহারের জন্য উপযুক্ত হবে। আমরা একটি চ্যানেলকে জবের কিউ হিসাবে কাজ করার জন্য ব্যবহার করব, এবং execute ThreadPool থেকে Worker ইনস্ট্যান্সগুলিতে একটি জব পাঠাবে, যা জবটি তার থ্রেডে পাঠাবে। এখানে পরিকল্পনাটি হলো:

  1. ThreadPool একটি চ্যানেল তৈরি করবে এবং সেন্ডারটি ধরে রাখবে।
  2. প্রতিটি Worker রিসিভারটি ধরে রাখবে।
  3. আমরা একটি নতুন Job struct তৈরি করব যা চ্যানেলের মাধ্যমে পাঠাতে চাওয়া ক্লোজারগুলো ধারণ করবে।
  4. execute মেথডটি যে জবটি এক্সিকিউট করতে চায় তা সেন্ডারের মাধ্যমে পাঠাবে।
  5. তার থ্রেডে, Worker তার রিসিভারের উপর লুপ করবে এবং প্রাপ্ত যেকোনো জবের ক্লোজার এক্সিকিউট করবে।

চলুন ThreadPool::new-এ একটি চ্যানেল তৈরি করে এবং ThreadPool ইনস্ট্যান্সে সেন্ডারটি ধরে রাখার মাধ্যমে শুরু করি, যেমনটি লিস্টিং ২১-১৬ এ দেখানো হয়েছে। Job struct টি আপাতত কিছু ধারণ করে না তবে এটি সেই আইটেমের টাইপ হবে যা আমরা চ্যানেলের মাধ্যমে পাঠাচ্ছি।

use std::{sync::mpsc, thread};

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: mpsc::Sender<Job>,
}

struct Job;

impl ThreadPool {
    // --snip--
    /// Create a new ThreadPool.
    ///
    /// The size is the number of threads in the pool.
    ///
    /// # Panics
    ///
    /// The `new` function will panic if the size is zero.
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let (sender, receiver) = mpsc::channel();

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id));
        }

        ThreadPool { workers, sender }
    }
    // --snip--

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
    }
}

struct Worker {
    id: usize,
    thread: thread::JoinHandle<()>,
}

impl Worker {
    fn new(id: usize) -> Worker {
        let thread = thread::spawn(|| {});

        Worker { id, thread }
    }
}

ThreadPool::new-এ, আমরা আমাদের নতুন চ্যানেল তৈরি করি এবং পুলটিকে সেন্ডার ধরে রাখতে দিই। এটি সফলভাবে কম্পাইল হবে।

চলুন থ্রেড পুল চ্যানেল তৈরি করার সময় প্রতিটি Worker-এর মধ্যে চ্যানেলের একটি রিসিভার পাস করার চেষ্টা করি। আমরা জানি যে আমরা Worker ইনস্ট্যান্স দ্বারা স্পন করা থ্রেডে রিসিভারটি ব্যবহার করতে চাই, তাই আমরা ক্লোজারে receiver প্যারামিটারটি রেফারেন্স করব। লিস্টিং ২১-১৭ এর কোডটি এখনও ঠিক কম্পাইল হবে না।

use std::{sync::mpsc, thread};

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: mpsc::Sender<Job>,
}

struct Job;

impl ThreadPool {
    // --snip--
    /// Create a new ThreadPool.
    ///
    /// The size is the number of threads in the pool.
    ///
    /// # Panics
    ///
    /// The `new` function will panic if the size is zero.
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let (sender, receiver) = mpsc::channel();

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id, receiver));
        }

        ThreadPool { workers, sender }
    }
    // --snip--

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
    }
}

// --snip--


struct Worker {
    id: usize,
    thread: thread::JoinHandle<()>,
}

impl Worker {
    fn new(id: usize, receiver: mpsc::Receiver<Job>) -> Worker {
        let thread = thread::spawn(|| {
            receiver;
        });

        Worker { id, thread }
    }
}

আমরা কিছু ছোট এবং সহজ পরিবর্তন করেছি: আমরা Worker::new-এ রিসিভার পাস করেছি, এবং তারপর আমরা এটি ক্লোজারের ভিতরে ব্যবহার করেছি।

যখন আমরা এই কোডটি চেক করার চেষ্টা করি, আমরা এই এররটি পাই:

$ cargo check
    Checking hello v0.1.0 (file:///projects/hello)
error[E0382]: use of moved value: `receiver`
  --> src/lib.rs:26:42
   |
21 |         let (sender, receiver) = mpsc::channel();
   |                      -------- move occurs because `receiver` has type `std::sync::mpsc::Receiver<Job>`, which does not implement the `Copy` trait
...
25 |         for id in 0..size {
   |         ----------------- inside of this loop
26 |             workers.push(Worker::new(id, receiver));
   |                                          ^^^^^^^^ value moved here, in previous iteration of loop
   |
note: consider changing this parameter type in method `new` to borrow instead if owning the value isn't necessary
  --> src/lib.rs:47:33
   |
47 |     fn new(id: usize, receiver: mpsc::Receiver<Job>) -> Worker {
   |        --- in this method       ^^^^^^^^^^^^^^^^^^^ this parameter takes ownership of the value
help: consider moving the expression out of the loop so it is only moved once
   |
25 ~         let mut value = Worker::new(id, receiver);
26 ~         for id in 0..size {
27 ~             workers.push(value);
   |

For more information about this error, try `rustc --explain E0382`.
error: could not compile `hello` (lib) due to 1 previous error```

কোডটি `receiver`-কে একাধিক `Worker` ইনস্ট্যান্সে পাস করার চেষ্টা করছে। এটি কাজ করবে না, যেমন আপনি অধ্যায় ১৬ থেকে মনে করতে পারেন: Rust দ্বারা প্রদত্ত চ্যানেল ইমপ্লিমেন্টেশনটি হলো মাল্টিপল _প্রডিউসার_, সিঙ্গেল _কনজিউমার_। এর মানে আমরা এই কোডটি ঠিক করার জন্য চ্যানেলের কনজিউমিং এন্ডটি ক্লোন করতে পারি না। আমরা একটি বার্তা একাধিক কনজিউমারের কাছে একাধিকবার পাঠাতেও চাই না; আমরা একাধিক `Worker` ইনস্ট্যান্সসহ বার্তাগুলির একটি তালিকা চাই যাতে প্রতিটি বার্তা একবার প্রসেস হয়।

এছাড়াও, চ্যানেল কিউ থেকে একটি জব নেওয়ার জন্য `receiver`-কে মিউটেট করতে হয়, তাই থ্রেডগুলির `receiver`-কে শেয়ার এবং পরিবর্তন করার জন্য একটি নিরাপদ উপায় প্রয়োজন; অন্যথায়, আমরা রেস কন্ডিশন পেতে পারি (যেমনটি অধ্যায় ১৬-তে আলোচনা করা হয়েছে)।

অধ্যায় ১৬-তে আলোচনা করা থ্রেড-সেফ স্মার্ট পয়েন্টারগুলির কথা মনে করুন: একাধিক থ্রেডে মালিকানা শেয়ার করতে এবং থ্রেডগুলিকে মান পরিবর্তন করার অনুমতি দিতে, আমাদের `Arc<Mutex<T>>` ব্যবহার করতে হবে। `Arc` টাইপ একাধিক `Worker`-কে রিসিভারের মালিকানা দেবে এবং `Mutex` নিশ্চিত করবে যে একবারে শুধুমাত্র একটি `Worker`-ই রিসিভার থেকে জব পাবে। লিস্টিং ২১-১৮ আমাদের প্রয়োজনীয় পরিবর্তনগুলি দেখায়।

<Listing number="21-18" file-name="src/lib.rs" caption="`Arc` এবং `Mutex` ব্যবহার করে `Worker` ইনস্ট্যান্সগুলির মধ্যে রিসিভার শেয়ার করা">

```rust,noplayground
use std::{
    sync::{Arc, Mutex, mpsc},
    thread,
};
// --snip--

# pub struct ThreadPool {
#     workers: Vec<Worker>,
#     sender: mpsc::Sender<Job>,
# }
# 
# struct Job;
# 
impl ThreadPool {
    // --snip--
#     /// Create a new ThreadPool.
#     ///
#     /// The size is the number of threads in the pool.
#     ///
#     /// # Panics
#     ///
#     /// The `new` function will panic if the size is zero.
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let (sender, receiver) = mpsc::channel();

        let receiver = Arc::new(Mutex::new(receiver));

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id, Arc::clone(&receiver)));
        }

        ThreadPool { workers, sender }
    }

    // --snip--
# 
#     pub fn execute<F>(&self, f: F)
#     where
#         F: FnOnce() + Send + 'static,
#     {
#     }
}

// --snip--

# struct Worker {
#     id: usize,
#     thread: thread::JoinHandle<()>,
# }
# 
impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
        // --snip--
#         let thread = thread::spawn(|| {
#             receiver;
#         });
# 
#         Worker { id, thread }
    }
}

ThreadPool::new-এ, আমরা রিসিভারটিকে একটি Arc এবং একটি Mutex-এ রাখি। প্রতিটি নতুন Worker-এর জন্য, আমরা Arc ক্লোন করি যাতে রেফারেন্স কাউন্ট বাড়ে এবং Worker ইনস্ট্যান্সগুলি রিসিভারের মালিকানা শেয়ার করতে পারে।

এই পরিবর্তনগুলির সাথে, কোডটি কম্পাইল হয়! আমরা লক্ষ্যের কাছাকাছি চলে এসেছি!

execute মেথডটি ইমপ্লিমেন্ট করা

চলুন অবশেষে ThreadPool-এর execute মেথডটি ইমপ্লিমেন্ট করি। আমরা Job-কে একটি struct থেকে একটি trait object-এর জন্য একটি টাইপ এলিয়াসে পরিবর্তন করব যা execute-এর প্রাপ্ত ক্লোজারের টাইপ ধারণ করে। অধ্যায় ২০-এর "Creating Type Synonyms with Type Aliases"-এ আলোচনা করা হয়েছে, টাইপ এলিয়াস আমাদের দীর্ঘ টাইপগুলোকে ব্যবহারের সুবিধার জন্য ছোট করতে দেয়। লিস্টিং ২১-১৯ দেখুন।

use std::{
    sync::{Arc, Mutex, mpsc},
    thread,
};

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: mpsc::Sender<Job>,
}

// --snip--

type Job = Box<dyn FnOnce() + Send + 'static>;

impl ThreadPool {
    // --snip--
    /// Create a new ThreadPool.
    ///
    /// The size is the number of threads in the pool.
    ///
    /// # Panics
    ///
    /// The `new` function will panic if the size is zero.
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let (sender, receiver) = mpsc::channel();

        let receiver = Arc::new(Mutex::new(receiver));

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id, Arc::clone(&receiver)));
        }

        ThreadPool { workers, sender }
    }

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
        let job = Box::new(f);

        self.sender.send(job).unwrap();
    }
}

// --snip--

struct Worker {
    id: usize,
    thread: thread::JoinHandle<()>,
}

impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
        let thread = thread::spawn(|| {
            receiver;
        });

        Worker { id, thread }
    }
}

execute-এ প্রাপ্ত ক্লোজার ব্যবহার করে একটি নতুন Job ইনস্ট্যান্স তৈরি করার পরে, আমরা সেই জবটি চ্যানেলের সেন্ডিং এন্ড দিয়ে পাঠাই। আমরা send-এর উপর unwrap কল করছি যদি সেন্ডিং ব্যর্থ হয় সেই কেসের জন্য। এটি ঘটতে পারে যদি, উদাহরণস্বরূপ, আমরা আমাদের সমস্ত থ্রেডকে এক্সিকিউট করা থেকে বিরত রাখি, যার মানে রিসিভিং এন্ড নতুন বার্তা গ্রহণ করা বন্ধ করে দিয়েছে। এই মুহূর্তে, আমরা আমাদের থ্রেডগুলিকে এক্সিকিউট করা থেকে বিরত রাখতে পারি না: আমাদের থ্রেডগুলি পুল বিদ্যমান থাকা পর্যন্ত এক্সিকিউট হতে থাকে। আমরা unwrap ব্যবহার করার কারণ হলো আমরা জানি যে ব্যর্থতার কেসটি ঘটবে না, কিন্তু কম্পাইলার তা জানে না।

কিন্তু আমরা এখনও পুরোপুরি শেষ করিনি! Worker-এ, thread::spawn-কে পাস করা আমাদের ক্লোজারটি এখনও শুধুমাত্র চ্যানেলের রিসিভিং এন্ডকে রেফারেন্স করে। এর পরিবর্তে, আমাদের ক্লোজারটিকে চিরতরে লুপ করতে হবে, চ্যানেলের রিসিভিং এন্ড থেকে একটি জবের জন্য জিজ্ঞাসা করতে হবে এবং যখন এটি একটি জব পায় তখন সেটি চালাতে হবে। চলুন লিস্টিং ২১-২০-তে দেখানো পরিবর্তনটি Worker::new-তে করি।

use std::{
    sync::{Arc, Mutex, mpsc},
    thread,
};

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: mpsc::Sender<Job>,
}

type Job = Box<dyn FnOnce() + Send + 'static>;

impl ThreadPool {
    /// Create a new ThreadPool.
    ///
    /// The size is the number of threads in the pool.
    ///
    /// # Panics
    ///
    /// The `new` function will panic if the size is zero.
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let (sender, receiver) = mpsc::channel();

        let receiver = Arc::new(Mutex::new(receiver));

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id, Arc::clone(&receiver)));
        }

        ThreadPool { workers, sender }
    }

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
        let job = Box::new(f);

        self.sender.send(job).unwrap();
    }
}

struct Worker {
    id: usize,
    thread: thread::JoinHandle<()>,
}

// --snip--

impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
        let thread = thread::spawn(move || {
            loop {
                let job = receiver.lock().unwrap().recv().unwrap();

                println!("Worker {id} got a job; executing.");

                job();
            }
        });

        Worker { id, thread }
    }
}

এখানে, আমরা প্রথমে মিউটেক্স অর্জন করতে receiver-এর উপর lock কল করি, এবং তারপর আমরা যেকোনো এররের উপর প্যানিক করার জন্য unwrap কল করি। একটি লক অর্জন ব্যর্থ হতে পারে যদি মিউটেক্সটি একটি poisoned অবস্থায় থাকে, যা ঘটতে পারে যদি অন্য কোনো থ্রেড লকটি ধরে রাখার সময় প্যানিক করে এবং লকটি রিলিজ না করে। এই পরিস্থিতিতে, এই থ্রেডটিকে প্যানিক করানোর জন্য unwrap কল করা সঠিক পদক্ষেপ। আপনি এই unwrap-কে আপনার জন্য অর্থপূর্ণ একটি এরর বার্তা সহ একটি expect-এ পরিবর্তন করতে পারেন।

যদি আমরা মিউটেক্সের উপর লক পাই, আমরা চ্যানেল থেকে একটি Job গ্রহণ করতে recv কল করি। একটি চূড়ান্ত unwrap এখানেও যেকোনো এরর পার করে দেয়, যা ঘটতে পারে যদি সেন্ডার ধারণকারী থ্রেডটি বন্ধ হয়ে যায়, যেমন send মেথডটি রিসিভার বন্ধ হয়ে গেলে Err রিটার্ন করে।

recv-এর কলটি ব্লক করে, তাই যদি এখনও কোনো জব না থাকে, বর্তমান থ্রেডটি একটি জব উপলব্ধ না হওয়া পর্যন্ত অপেক্ষা করবে। Mutex<T> নিশ্চিত করে যে একবারে শুধুমাত্র একটি Worker থ্রেডই একটি জব রিকোয়েস্ট করার চেষ্টা করছে।

আমাদের থ্রেড পুল এখন একটি কার্যকরী অবস্থায় আছে! cargo run দিন এবং কিছু রিকোয়েস্ট করুন:

$ cargo run
   Compiling hello v0.1.0 (file:///projects/hello)
warning: field `workers` is never read
 --> src/lib.rs:7:5
  |
6 | pub struct ThreadPool {
  |            ---------- field in this struct
7 |     workers: Vec<Worker>,
  |     ^^^^^^^
  |
  = note: `#[warn(dead_code)]` on by default

warning: fields `id` and `thread` are never read
  --> src/lib.rs:48:5
   |
47 | struct Worker {
   |        ------ fields in this struct
48 |     id: usize,
   |     ^^
49 |     thread: thread::JoinHandle<()>,
   |     ^^^^^^

warning: `hello` (lib) generated 2 warnings
    Finished `dev` profile [unoptimized + debuginfo] target(s) in 4.91s
     Running `target/debug/hello`
Worker 0 got a job; executing.
Worker 2 got a job; executing.
Worker 1 got a job; executing.
Worker 3 got a job; executing.
Worker 0 got a job; executing.
Worker 2 got a job; executing.
Worker 1 got a job; executing.
Worker 3 got a job; executing.
Worker 0 got a job; executing.
Worker 2 got a job; executing.

সফল! আমাদের এখন একটি থ্রেড পুল আছে যা কানেকশনগুলো অ্যাসিঙ্ক্রোনাসভাবে এক্সিকিউট করে। এখানে কখনও চারের বেশি থ্রেড তৈরি হয় না, তাই সার্ভার যদি অনেক রিকোয়েস্ট পায় তবে আমাদের সিস্টেম ওভারলোড হবে না। যদি আমরা /sleep-এ একটি রিকোয়েস্ট করি, সার্ভারটি অন্য একটি থ্রেড দিয়ে অন্য রিকোয়েস্টগুলো সার্ভ করতে সক্ষম হবে।

দ্রষ্টব্য: আপনি যদি একাধিক ব্রাউজার উইন্ডোতে একই সাথে /sleep খোলেন, তবে সেগুলি পাঁচ সেকেন্ডের ব্যবধানে একে একে লোড হতে পারে। কিছু ওয়েব ব্রাউজার ক্যাশিংয়ের কারণে একই রিকোয়েস্টের একাধিক ইনস্ট্যান্স ক্রমানুসারে এক্সিকিউট করে। এই সীমাবদ্ধতাটি আমাদের ওয়েব সার্ভারের কারণে নয়।

এটি একটি ভাল সময় থামার এবং চিন্তা করার যে লিস্টিং ২১-১৮, ২১-১৯, এবং ২১-২০-এর কোড কীভাবে ভিন্ন হতো যদি আমরা কাজ করার জন্য একটি ক্লোজারের পরিবর্তে ফিউচার (futures) ব্যবহার করতাম। কোন টাইপগুলি পরিবর্তন হতো? মেথড সিগনেচারগুলো কীভাবে ভিন্ন হতো, যদি überhaupt হয়? কোডের কোন অংশগুলো একই থাকতো?

অধ্যায় ১৭ এবং অধ্যায় ১৯-এ while let লুপ সম্পর্কে শেখার পর, আপনি হয়তো ভাবছেন কেন আমরা Worker থ্রেডের কোডটি লিস্টিং ২১-২১-এ দেখানো উপায়ে লিখিনি।

use std::{
    sync::{Arc, Mutex, mpsc},
    thread,
};

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: mpsc::Sender<Job>,
}

type Job = Box<dyn FnOnce() + Send + 'static>;

impl ThreadPool {
    /// Create a new ThreadPool.
    ///
    /// The size is the number of threads in the pool.
    ///
    /// # Panics
    ///
    /// The `new` function will panic if the size is zero.
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let (sender, receiver) = mpsc::channel();

        let receiver = Arc::new(Mutex::new(receiver));

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id, Arc::clone(&receiver)));
        }

        ThreadPool { workers, sender }
    }

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
        let job = Box::new(f);

        self.sender.send(job).unwrap();
    }
}

struct Worker {
    id: usize,
    thread: thread::JoinHandle<()>,
}
// --snip--

impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
        let thread = thread::spawn(move || {
            while let Ok(job) = receiver.lock().unwrap().recv() {
                println!("Worker {id} got a job; executing.");

                job();
            }
        });

        Worker { id, thread }
    }
}

এই কোডটি কম্পাইল এবং রান হয় কিন্তু কাঙ্ক্ষিত থ্রেডিং আচরণ দেয় না: একটি ধীরগতির রিকোয়েস্ট এখনও অন্যান্য রিকোয়েস্টকে প্রসেস করার জন্য অপেক্ষা করাবে। কারণটি কিছুটা সূক্ষ্ম: Mutex struct-এর কোনো পাবলিক unlock মেথড নেই কারণ লকের মালিকানা lock মেথডের রিটার্ন করা LockResult<MutexGuard<T>>-এর মধ্যে থাকা MutexGuard<T>-এর লাইফটাইমের উপর ভিত্তি করে। কম্পাইল টাইমে, বোরো চেকার তখন এই নিয়মটি প্রয়োগ করতে পারে যে একটি Mutex দ্বারা সুরক্ষিত রিসোর্স অ্যাক্সেস করা যাবে না যদি না আমরা লকটি ধরে রাখি। তবে, এই ইমপ্লিমেন্টেশনটি MutexGuard<T>-এর লাইফটাইম সম্পর্কে সতর্ক না থাকলে উদ্দেশ্যর চেয়ে বেশি সময় ধরে লক ধরে রাখতে পারে।

লিস্টিং ২১-২০-এর কোড যা let job = receiver.lock().unwrap().recv().unwrap(); ব্যবহার করে, তা কাজ করে কারণ let-এর সাথে, সমান চিহ্নের ডানদিকে এক্সপ্রেশনে ব্যবহৃত যেকোনো টেম্পোরারি ভ্যালু let স্টেটমেন্ট শেষ হওয়ার সাথে সাথে ড্রপ হয়ে যায়। তবে, while let (এবং if letmatch) সংশ্লিষ্ট ব্লকের শেষ না হওয়া পর্যন্ত টেম্পোরারি ভ্যালু ড্রপ করে না। লিস্টিং ২১-২১-এ, job() কলের পুরো সময় ধরে লকটি ধরা থাকে, যার মানে অন্য Worker ইনস্ট্যান্সগুলো জব রিসিভ করতে পারে না।