আমাদের সিঙ্গেল-থ্রেডেড সার্ভারকে মাল্টিথ্রেডেড সার্ভারে পরিণত করা
এখন, সার্ভার প্রতিটি অনুরোধ একে একে প্রক্রিয়া করবে, মানে এটি প্রথমটির প্রসেসিং শেষ না হওয়া পর্যন্ত দ্বিতীয় কানেকশনটি প্রক্রিয়া করবে না। যদি সার্ভার আরও বেশি সংখ্যক অনুরোধ গ্রহণ করে, তাহলে এই সিরিয়াল এক্সিকিউশন ক্রমশ কম অপ্টিমাল হবে। যদি সার্ভার এমন একটি অনুরোধ পায় যা প্রক্রিয়া করতে বেশি সময় নেয়, তাহলে পরবর্তী অনুরোধগুলিকে দীর্ঘ অনুরোধটি শেষ না হওয়া পর্যন্ত অপেক্ষা করতে হবে, এমনকি যদি নতুন অনুরোধগুলি দ্রুত প্রক্রিয়া করা যায় তাহলেও। আমাদের এটি ঠিক করতে হবে, কিন্তু প্রথমে আমরা সমস্যাটি কার্যকর অবস্থায় দেখব।
বর্তমান সার্ভার ইমপ্লিমেন্টেশনে একটি স্লো রিকোয়েস্ট সিমুলেট করা
আমরা দেখব কিভাবে একটি স্লো-প্রসেসিং অনুরোধ আমাদের বর্তমান সার্ভার ইমপ্লিমেন্টেশনে করা অন্যান্য অনুরোধগুলিকে প্রভাবিত করতে পারে। লিস্টিং ২১-১০ /sleep-এ একটি অনুরোধ হ্যান্ডেল করা ইমপ্লিমেন্ট করে, যেখানে একটি সিমুলেটেড স্লো রেসপন্স থাকবে যা সার্ভারকে প্রতিক্রিয়া জানানোর আগে পাঁচ সেকেন্ডের জন্য স্লিপিং মোডে রাখবে।
use std::{ fs, io::{BufReader, prelude::*}, net::{TcpListener, TcpStream}, thread, time::Duration, }; // --snip-- fn main() { let listener = TcpListener::bind("127.0.0.1:7878").unwrap(); for stream in listener.incoming() { let stream = stream.unwrap(); handle_connection(stream); } } fn handle_connection(mut stream: TcpStream) { // --snip-- let buf_reader = BufReader::new(&stream); let request_line = buf_reader.lines().next().unwrap().unwrap(); let (status_line, filename) = match &request_line[..] { "GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"), "GET /sleep HTTP/1.1" => { thread::sleep(Duration::from_secs(5)); ("HTTP/1.1 200 OK", "hello.html") } _ => ("HTTP/1.1 404 NOT FOUND", "404.html"), }; // --snip-- let contents = fs::read_to_string(filename).unwrap(); let length = contents.len(); let response = format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}"); stream.write_all(response.as_bytes()).unwrap(); }
আমরা if
থেকে match
-এ পরিবর্তন করেছি, কারণ এখন আমাদের তিনটি কেস রয়েছে। স্ট্রিং লিটারেল ভ্যালুগুলির সাথে প্যাটার্ন ম্যাচ করার জন্য আমাদের request_line
-এর একটি স্লাইসের উপর স্পষ্টভাবে ম্যাচ করতে হবে; match
স্বয়ংক্রিয়ভাবে রেফারেন্সিং এবং ডিরেফারেন্সিং করে না, যেমনটি ইকুয়ালিটি মেথড করে।
প্রথম আর্মটি লিস্টিং ২১-৯ এর if
ব্লকের মতোই। দ্বিতীয় আর্মটি /sleep-এর অনুরোধের সাথে মেলে। যখন সেই অনুরোধটি গৃহীত হয়, সার্ভার সফল HTML পেজটি রেন্ডার করার আগে পাঁচ সেকেন্ডের জন্য স্লিপ করবে। তৃতীয় আর্মটি লিস্টিং ২১-৯ এর else
ব্লকের মতোই।
আপনি দেখতে পাচ্ছেন কিভাবে আমাদের সার্ভারটি প্রাথমিক: প্রকৃত লাইব্রেরিগুলি একাধিক অনুরোধের স্বীকৃতি আরও কম ভারবোস উপায়ে হ্যান্ডেল করবে!
cargo run
ব্যবহার করে সার্ভার শুরু করুন। তারপর দুটি ব্রাউজার উইন্ডো খুলুন: একটি http://127.0.0.1:7878/ এর জন্য এবং অন্যটি http://127.0.0.1:7878/sleep এর জন্য। আপনি যদি আগের মতো কয়েকবার / URI-তে প্রবেশ করেন, তাহলে আপনি দেখতে পাবেন এটি দ্রুত প্রতিক্রিয়া জানাচ্ছে। কিন্তু আপনি যদি /sleep-এ প্রবেশ করেন এবং তারপর / লোড করেন, তাহলে আপনি দেখতে পাবেন যে / লোড হওয়ার আগে sleep
তার পুরো পাঁচ সেকেন্ডের জন্য ঘুমিয়েছে।
একটি স্লো অনুরোধের পিছনে অন্যান্য অনুরোধগুলি আটকে যাওয়া এড়াতে আমরা একাধিক কৌশল ব্যবহার করতে পারি, যার মধ্যে একটি হল async ব্যবহার করা যেমনটি আমরা ১৭ অধ্যায়ে করেছি; আমরা যেটি ইমপ্লিমেন্ট করব সেটি হল একটি থ্রেড পুল।
থ্রেড পুল দিয়ে থ্রুপুট উন্নত করা
একটি থ্রেড পুল হল স্পন করা থ্রেডগুলির একটি গ্রুপ যা অপেক্ষা করছে এবং একটি কাজ হ্যান্ডেল করার জন্য প্রস্তুত। যখন প্রোগ্রামটি একটি নতুন কাজ পায়, তখন এটি পুলের একটি থ্রেডকে কাজটি অর্পণ করে এবং সেই থ্রেডটি কাজটি প্রক্রিয়া করবে। পুলের অবশিষ্ট থ্রেডগুলি অন্য কোনো কাজ হ্যান্ডেল করার জন্য উপলব্ধ থাকে, যখন প্রথম থ্রেডটি প্রসেসিং করছে। যখন প্রথম থ্রেডটি তার কাজ প্রক্রিয়া করা শেষ করে, তখন সেটি অলস থ্রেডগুলির পুলে ফিরে আসে, একটি নতুন কাজ হ্যান্ডেল করার জন্য প্রস্তুত। একটি থ্রেড পুল আপনাকে কানেকশনগুলি কনকারেন্টলি প্রক্রিয়া করার অনুমতি দেয়, আপনার সার্ভারের থ্রুপুট বৃদ্ধি করে।
DoS অ্যাটাক থেকে আমাদের রক্ষা করার জন্য আমরা পুলের থ্রেডের সংখ্যা একটি ছোট সংখ্যায় সীমাবদ্ধ করব; যদি আমাদের প্রোগ্রাম প্রতিটি অনুরোধ আসার সাথে সাথে একটি নতুন থ্রেড তৈরি করত, তাহলে কেউ আমাদের সার্ভারে ১০ মিলিয়ন অনুরোধ করলে আমাদের সার্ভারের সমস্ত সংস্থান ব্যবহার করে এবং অনুরোধগুলির প্রসেসিং বন্ধ করে দিয়ে বিপর্যয় সৃষ্টি করতে পারত।
আনলিমিটেড থ্রেড স্পন করার পরিবর্তে, আমাদের পুলে একটি নির্দিষ্ট সংখ্যক থ্রেড অপেক্ষা করবে। আসা অনুরোধগুলি প্রসেসিংয়ের জন্য পুলে পাঠানো হয়। পুলটি আগত অনুরোধগুলির একটি কিউ বজায় রাখবে। পুলের প্রতিটি থ্রেড এই কিউ থেকে একটি অনুরোধ তুলে নেবে, অনুরোধটি হ্যান্ডেল করবে এবং তারপর কিউ-কে অন্য একটি অনুরোধের জন্য জিজ্ঞাসা করবে। এই ডিজাইনের সাহায্যে, আমরা N
পর্যন্ত অনুরোধ কনকারেন্টলি প্রক্রিয়া করতে পারি, যেখানে N
হল থ্রেডের সংখ্যা। যদি প্রতিটি থ্রেড একটি দীর্ঘ-চলমান অনুরোধের প্রতিক্রিয়া জানায়, তাহলে পরবর্তী অনুরোধগুলি এখনও কিউতে ব্যাক আপ করতে পারে, কিন্তু আমরা সেই পয়েন্টে পৌঁছানোর আগে আমরা যে দীর্ঘ-চলমান অনুরোধগুলি হ্যান্ডেল করতে পারি তার সংখ্যা বাড়িয়েছি।
এই কৌশলটি একটি ওয়েব সার্ভারের থ্রুপুট উন্নত করার অনেকগুলি উপায়ের মধ্যে একটি। আপনি যে অন্যান্য অপশনগুলি দেখতে পারেন সেগুলি হল ফর্ক/জয়েন মডেল, সিঙ্গেল-থ্রেডেড অ্যাসিঙ্ক্রোনাস I/O মডেল এবং মাল্টি-থ্রেডেড অ্যাসিঙ্ক্রোনাস I/O মডেল। আপনি যদি এই বিষয়ে আগ্রহী হন, তাহলে আপনি অন্যান্য সমাধান সম্পর্কে আরও পড়তে পারেন এবং সেগুলি ইমপ্লিমেন্ট করার চেষ্টা করতে পারেন; Rust-এর মতো একটি নিম্ন-স্তরের ভাষার সাথে, এই সমস্ত অপশন সম্ভব।
আমরা একটি থ্রেড পুল ইমপ্লিমেন্ট করা শুরু করার আগে, আসুন পুলটি ব্যবহার করা কেমন হওয়া উচিত সে সম্পর্কে কথা বলি। যখন আপনি কোড ডিজাইন করার চেষ্টা করছেন, তখন ক্লায়েন্ট ইন্টারফেসটি প্রথমে লেখা আপনার ডিজাইনকে গাইড করতে সাহায্য করতে পারে। কোডের API এমনভাবে লিখুন যাতে আপনি যেভাবে এটি কল করতে চান সেইভাবে এটি গঠিত হয়; তারপর কার্যকারিতা ইমপ্লিমেন্ট করে এবং তারপর পাবলিক API ডিজাইন করার পরিবর্তে সেই কাঠামোর মধ্যে কার্যকারিতা ইমপ্লিমেন্ট করুন।
১২ অধ্যায়ের প্রোজেক্টে আমরা যেভাবে টেস্ট-চালিত ডেভেলপমেন্ট ব্যবহার করেছি, আমরা এখানে কম্পাইলার-চালিত ডেভেলপমেন্ট ব্যবহার করব। আমরা সেই কোডটি লিখব যা আমাদের ইচ্ছামতো ফাংশনগুলিকে কল করে এবং তারপর কম্পাইলার থেকে এররগুলি দেখে আমরা নির্ধারণ করব যে কোডটি কাজ করার জন্য আমাদের পরবর্তীতে কী পরিবর্তন করা উচিত। তবে, আমরা এটি করার আগে, আমরা সেই কৌশলটি অন্বেষণ করব যা আমরা শুরুর পয়েন্ট হিসাবে ব্যবহার করব না।
প্রতিটি অনুরোধের জন্য একটি থ্রেড স্পন করা
প্রথমে, আসুন দেখি আমাদের কোডটি কেমন হতে পারে যদি এটি প্রতিটি কানেকশনের জন্য একটি নতুন থ্রেড তৈরি করে। আগেই যেমন উল্লেখ করা হয়েছে, এটি আমাদের চূড়ান্ত প্ল্যান নয় কারণ আনলিমিটেড সংখ্যক থ্রেড স্পন করার সমস্যা রয়েছে, তবে প্রথমে একটি কার্যকরী মাল্টিথ্রেডেড সার্ভার পাওয়ার জন্য এটি একটি শুরুর পয়েন্ট। তারপর আমরা থ্রেড পুলটিকে একটি উন্নতি হিসাবে যুক্ত করব এবং দুটি সমাধানের মধ্যে পার্থক্য করা সহজ হবে। লিস্টিং ২১-১১ for
লুপের মধ্যে প্রতিটি স্ট্রিম হ্যান্ডেল করার জন্য একটি নতুন থ্রেড স্পন করতে main
-এ যে পরিবর্তনগুলি করতে হবে তা দেখায়।
use std::{ fs, io::{BufReader, prelude::*}, net::{TcpListener, TcpStream}, thread, time::Duration, }; fn main() { let listener = TcpListener::bind("127.0.0.1:7878").unwrap(); for stream in listener.incoming() { let stream = stream.unwrap(); thread::spawn(|| { handle_connection(stream); }); } } fn handle_connection(mut stream: TcpStream) { let buf_reader = BufReader::new(&stream); let request_line = buf_reader.lines().next().unwrap().unwrap(); let (status_line, filename) = match &request_line[..] { "GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"), "GET /sleep HTTP/1.1" => { thread::sleep(Duration::from_secs(5)); ("HTTP/1.1 200 OK", "hello.html") } _ => ("HTTP/1.1 404 NOT FOUND", "404.html"), }; let contents = fs::read_to_string(filename).unwrap(); let length = contents.len(); let response = format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}"); stream.write_all(response.as_bytes()).unwrap(); }
আপনি যেমন ১৬ অধ্যায়ে শিখেছেন, thread::spawn
একটি নতুন থ্রেড তৈরি করবে এবং তারপর নতুন থ্রেডে ক্লোজারের কোডটি চালাবে। আপনি যদি এই কোডটি চালান এবং আপনার ব্রাউজারে /sleep লোড করেন, তারপর আরও দুটি ব্রাউজার ট্যাবে / লোড করেন, তাহলে আপনি দেখবেন যে / এর অনুরোধগুলিকে /sleep শেষ হওয়ার জন্য অপেক্ষা করতে হবে না। তবে, যেমনটি আমরা উল্লেখ করেছি, এটি অবশেষে সিস্টেমটিকে অভিভূত করবে কারণ আপনি কোনো সীমা ছাড়াই নতুন থ্রেড তৈরি করবেন।
আপনি হয়তো ১৭ অধ্যায় থেকেও মনে করতে পারেন যে এটি ঠিক সেই ধরনের পরিস্থিতি যেখানে async এবং await সত্যিই উজ্জ্বল! থ্রেড পুল তৈরি করার সময় এটি মনে রাখবেন এবং চিন্তা করুন যে async-এর সাথে জিনিসগুলি কীভাবে আলাদা বা একই রকম দেখাবে।
সীমিত সংখ্যক থ্রেড তৈরি করা
আমরা চাই আমাদের থ্রেড পুলটি একই রকম, পরিচিত উপায়ে কাজ করুক যাতে থ্রেড থেকে থ্রেড পুলে পরিবর্তন করার জন্য আমাদের API ব্যবহার করা কোডে বড় পরিবর্তনের প্রয়োজন না হয়। লিস্টিং ২১-১২ একটি ThreadPool
স্ট্রাক্টের জন্য অনুমানমূলক ইন্টারফেস দেখায় যা আমরা thread::spawn
-এর পরিবর্তে ব্যবহার করতে চাই।
use std::{
fs,
io::{BufReader, prelude::*},
net::{TcpListener, TcpStream},
thread,
time::Duration,
};
fn main() {
let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
let pool = ThreadPool::new(4);
for stream in listener.incoming() {
let stream = stream.unwrap();
pool.execute(|| {
handle_connection(stream);
});
}
}
fn handle_connection(mut stream: TcpStream) {
let buf_reader = BufReader::new(&stream);
let request_line = buf_reader.lines().next().unwrap().unwrap();
let (status_line, filename) = match &request_line[..] {
"GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"),
"GET /sleep HTTP/1.1" => {
thread::sleep(Duration::from_secs(5));
("HTTP/1.1 200 OK", "hello.html")
}
_ => ("HTTP/1.1 404 NOT FOUND", "404.html"),
};
let contents = fs::read_to_string(filename).unwrap();
let length = contents.len();
let response =
format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}");
stream.write_all(response.as_bytes()).unwrap();
}
আমরা ThreadPool::new
ব্যবহার করে একটি কনফিগারযোগ্য সংখ্যক থ্রেড সহ একটি নতুন থ্রেড পুল তৈরি করি, এই ক্ষেত্রে চারটি। তারপর, for
লুপে, pool.execute
-এর thread::spawn
-এর মতোই একটি ইন্টারফেস রয়েছে, যেখানে এটি পুলের প্রতিটি স্ট্রিমের জন্য চালানো উচিত এমন একটি ক্লোজার নেয়। আমাদের pool.execute
ইমপ্লিমেন্ট করতে হবে যাতে এটি ক্লোজারটি নেয় এবং চালানোর জন্য পুলের একটি থ্রেডকে দেয়। এই কোডটি এখনও কম্পাইল হবে না, কিন্তু আমরা চেষ্টা করব যাতে কম্পাইলার আমাদের এটি কীভাবে ঠিক করতে হয় সে সম্পর্কে গাইড করতে পারে।
কম্পাইলার ড্রাইভেন ডেভেলপমেন্ট ব্যবহার করে ThreadPool
তৈরি করা
লিস্টিং 21-12-এ src/main.rs-এ পরিবর্তন করুন, এবং তারপর cargo check
থেকে কম্পাইলার এররগুলিকে আমাদের ডেভেলপমেন্ট ড্রাইভ করার জন্য ব্যবহার করি। আমরা যে প্রথম এররটি পাই তা এখানে:
$ cargo check
Checking hello v0.1.0 (file:///projects/hello)
error[E0433]: failed to resolve: use of undeclared type `ThreadPool`
--> src/main.rs:11:16
|
11 | let pool = ThreadPool::new(4);
| ^^^^^^^^^^ use of undeclared type `ThreadPool`
For more information about this error, try `rustc --explain E0433`.
error: could not compile `hello` (bin "hello") due to 1 previous error
দারুণ! এই এররটি আমাদের বলে যে আমাদের একটি ThreadPool
টাইপ বা মডিউল প্রয়োজন, তাই আমরা এখন একটি তৈরি করব। আমাদের ThreadPool
ইমপ্লিমেন্টেশন আমাদের ওয়েব সার্ভার যে ধরনের কাজ করছে তার থেকে স্বাধীন হবে। তাই আসুন hello
ক্রেটটিকে একটি বাইনারি ক্রেট থেকে একটি লাইব্রেরি ক্রেটে পরিবর্তন করি যাতে আমাদের ThreadPool
ইমপ্লিমেন্টেশন থাকে। আমরা একটি লাইব্রেরি ক্রেটে পরিবর্তন করার পরে, আমরা ওয়েব অনুরোধগুলি পরিবেশন করার জন্য নয়, থ্রেড পুল ব্যবহার করে আমরা যে কোনও কাজ করতে চাই তার জন্যও পৃথক থ্রেড পুল লাইব্রেরি ব্যবহার করতে পারি।
একটি src/lib.rs ফাইল তৈরি করুন যাতে নিম্নলিখিতগুলি রয়েছে, যা একটি ThreadPool
স্ট্রাক্টের সবচেয়ে সহজ সংজ্ঞা যা আমরা আপাতত রাখতে পারি:
pub struct ThreadPool;
তারপর লাইব্রেরি ক্রেট থেকে ThreadPool
কে স্কোপে আনতে src/main.rs ফাইলের শীর্ষে নিম্নলিখিত কোড যোগ করে main.rs ফাইলটি এডিট করুন:
use hello::ThreadPool;
use std::{
fs,
io::{BufReader, prelude::*},
net::{TcpListener, TcpStream},
thread,
time::Duration,
};
fn main() {
let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
let pool = ThreadPool::new(4);
for stream in listener.incoming() {
let stream = stream.unwrap();
pool.execute(|| {
handle_connection(stream);
});
}
}
fn handle_connection(mut stream: TcpStream) {
let buf_reader = BufReader::new(&stream);
let request_line = buf_reader.lines().next().unwrap().unwrap();
let (status_line, filename) = match &request_line[..] {
"GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"),
"GET /sleep HTTP/1.1" => {
thread::sleep(Duration::from_secs(5));
("HTTP/1.1 200 OK", "hello.html")
}
_ => ("HTTP/1.1 404 NOT FOUND", "404.html"),
};
let contents = fs::read_to_string(filename).unwrap();
let length = contents.len();
let response =
format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}");
stream.write_all(response.as_bytes()).unwrap();
}
এই কোডটি এখনও কাজ করবে না, তবে আসুন এটিকে আবার চেক করি যাতে আমাদের পরবর্তী এররটি পাওয়া যায়, যেটিকে আমাদের অ্যাড্রেস করতে হবে:
$ cargo check
Checking hello v0.1.0 (file:///projects/hello)
error[E0599]: no function or associated item named `new` found for struct `ThreadPool` in the current scope
--> src/main.rs:12:28
|
12 | let pool = ThreadPool::new(4);
| ^^^ function or associated item not found in `ThreadPool`
For more information about this error, try `rustc --explain E0599`.
error: could not compile `hello` (bin "hello") due to 1 previous error
এই এররটি ইঙ্গিত দেয় যে এরপরে আমাদের ThreadPool
-এর জন্য new
নামে একটি অ্যাসোসিয়েটেড ফাংশন তৈরি করতে হবে। আমরা জানি যে new
-এর একটি প্যারামিটার থাকতে হবে যা 4
কে আর্গুমেন্ট হিসাবে গ্রহণ করতে পারে এবং একটি ThreadPool
ইন্সট্যান্স রিটার্ন করা উচিত। আসুন সবচেয়ে সহজ new
ফাংশনটি ইমপ্লিমেন্ট করি যার এই বৈশিষ্ট্যগুলি থাকবে:
pub struct ThreadPool;
impl ThreadPool {
pub fn new(size: usize) -> ThreadPool {
ThreadPool
}
}
আমরা size
প্যারামিটারের টাইপ হিসাবে usize
বেছে নিয়েছি কারণ আমরা জানি যে নেগেটিভ সংখ্যক থ্রেডের কোনো অর্থ নেই। আমরা জানি যে আমরা এই 4
কে থ্রেডগুলির একটি কালেকশনের এলিমেন্টের সংখ্যা হিসাবে ব্যবহার করব, যেটি usize
টাইপের কাজ, যেমনটি তৃতীয় অধ্যায়ের “পূর্ণসংখ্যার প্রকারভেদ” এ আলোচনা করা হয়েছে।
আসুন আবার কোডটি চেক করি:
$ cargo check
Checking hello v0.1.0 (file:///projects/hello)
error[E0599]: no method named `execute` found for struct `ThreadPool` in the current scope
--> src/main.rs:17:14
|
17 | pool.execute(|| {
| -----^^^^^^^ method not found in `ThreadPool`
For more information about this error, try `rustc --explain E0599`.
error: could not compile `hello` (bin "hello") due to 1 previous error
এখন এররটি ঘটেছে কারণ ThreadPool
-এ আমাদের কোনো execute
মেথড নেই। “সীমাবদ্ধ সংখ্যক থ্রেড তৈরি করা” থেকে মনে করুন যে আমরা সিদ্ধান্ত নিয়েছি আমাদের থ্রেড পুলের thread::spawn
-এর মতোই একটি ইন্টারফেস থাকা উচিত। এছাড়াও, আমরা execute
ফাংশনটি ইমপ্লিমেন্ট করব যাতে এটি যে ক্লোজারটি পায় সেটি নেয় এবং চালানোর জন্য পুলের একটি অলস থ্রেডকে দেয়।
আমরা ThreadPool
-এ execute
মেথডটিকে সংজ্ঞায়িত করব যাতে এটি একটি প্যারামিটার হিসাবে একটি ক্লোজার নেয়। ত্রয়োদশ অধ্যায়ের “ক্লোজার থেকে ক্যাপচার করা ভ্যালুগুলিকে সরানো এবং Fn
ট্রেইট” থেকে মনে করুন যে আমরা তিনটি ভিন্ন ট্রেইট সহ ক্লোজারগুলিকে প্যারামিটার হিসাবে নিতে পারি: Fn
, FnMut
এবং FnOnce
। আমাদের এখানে কোন ধরনের ক্লোজার ব্যবহার করতে হবে তা নির্ধারণ করতে হবে। আমরা জানি যে আমরা শেষ পর্যন্ত স্ট্যান্ডার্ড লাইব্রেরির thread::spawn
ইমপ্লিমেন্টেশনের মতোই কিছু করব, তাই আমরা দেখতে পারি thread::spawn
-এর সিগনেচারের তার প্যারামিটারের উপর কী কী বাউন্ড রয়েছে। ডকুমেন্টেশন আমাদের নিম্নলিখিতগুলি দেখায়:
pub fn spawn<F, T>(f: F) -> JoinHandle<T>
where
F: FnOnce() -> T,
F: Send + 'static,
T: Send + 'static,
F
টাইপ প্যারামিটারটি হল যেটি নিয়ে আমরা এখানে চিন্তিত; T
টাইপ প্যারামিটারটি রিটার্ন ভ্যালুর সাথে সম্পর্কিত এবং আমরা সেটি নিয়ে চিন্তিত নই। আমরা দেখতে পাচ্ছি যে spawn
FnOnce
কে F
-এর উপর ট্রেইট বাউন্ড হিসাবে ব্যবহার করে। এটি সম্ভবত আমরাও চাই, কারণ আমরা অবশেষে execute
-এ যে আর্গুমেন্টটি পাই সেটি spawn
-এ পাস করব। আমরা আরও নিশ্চিত হতে পারি যে FnOnce
হল সেই ট্রেইট যা আমরা ব্যবহার করতে চাই কারণ একটি অনুরোধ চালানোর থ্রেডটি শুধুমাত্র সেই অনুরোধের ক্লোজারটি একবার চালাবে, যা FnOnce
-এর Once
-এর সাথে মেলে।
F
টাইপ প্যারামিটারের Send
ট্রেইট বাউন্ড এবং 'static
লাইফটাইম বাউন্ডও রয়েছে, যা আমাদের পরিস্থিতিতে দরকারী: আমাদের ক্লোজারটিকে একটি থ্রেড থেকে অন্য থ্রেডে স্থানান্তর করার জন্য Send
প্রয়োজন এবং 'static
কারণ আমরা জানি না থ্রেডটি এক্সিকিউট হতে কতক্ষণ সময় নেবে। আসুন ThreadPool
-এ একটি execute
মেথড তৈরি করি যা এই বাউন্ডগুলি সহ F
টাইপের একটি জেনেরিক প্যারামিটার নেবে:
pub struct ThreadPool;
impl ThreadPool {
// --snip--
pub fn new(size: usize) -> ThreadPool {
ThreadPool
}
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
}
}
আমরা এখনও FnOnce
-এর পরে ()
ব্যবহার করি কারণ এই FnOnce
এমন একটি ক্লোজারকে উপস্থাপন করে যা কোনো প্যারামিটার নেয় না এবং ইউনিট টাইপ ()
রিটার্ন করে। ফাংশন সংজ্ঞার মতোই, রিটার্ন টাইপটি সিগনেচার থেকে বাদ দেওয়া যেতে পারে, কিন্তু আমাদের কোনো প্যারামিটার না থাকলেও, আমাদের এখনও প্যারেন্থেসিস প্রয়োজন।
আবারও, এটি execute
মেথডের সবচেয়ে সহজ ইমপ্লিমেন্টেশন: এটি কিছুই করে না, তবে আমরা কেবল আমাদের কোড কম্পাইল করার চেষ্টা করছি। আসুন এটি আবার চেক করি:
$ cargo check
Checking hello v0.1.0 (file:///projects/hello)
Finished `dev` profile [unoptimized + debuginfo] target(s) in 0.24s
এটি কম্পাইল হয়! তবে মনে রাখবেন যে আপনি যদি cargo run
চেষ্টা করেন এবং ব্রাউজারে একটি অনুরোধ করেন, তাহলে আপনি ব্রাউজারে সেই এররগুলি দেখতে পাবেন যা আমরা এই অধ্যায়ের শুরুতে দেখেছিলাম। আমাদের লাইব্রেরি এখনও execute
-এ পাস করা ক্লোজারটিকে কল করছে না!
দ্রষ্টব্য: কঠোর কম্পাইলার সহ ভাষাগুলি, যেমন হাস্কেল (Haskell) এবং Rust সম্পর্কে আপনি যে উক্তিটি শুনতে পারেন তা হল "যদি কোড কম্পাইল হয়, তাহলে এটি কাজ করে।" কিন্তু এই উক্তিটি সর্বজনীনভাবে সত্য নয়। আমাদের প্রোজেক্ট কম্পাইল হয়, কিন্তু এটি একেবারে কিছুই করে না! যদি আমরা একটি বাস্তব, সম্পূর্ণ প্রোজেক্ট তৈরি করতাম, তাহলে কোডটি কম্পাইল হয় এবং আমাদের ইচ্ছামতো আচরণ করে কিনা তা পরীক্ষা করার জন্য ইউনিট টেস্ট লেখা শুরু করার জন্য এটি একটি ভাল সময় হবে।
বিবেচনা করুন: আমরা যদি ক্লোজারের পরিবর্তে একটি ফিউচার এক্সিকিউট করতাম তবে এখানে কী আলাদা হত?
new
-এ থ্রেডের সংখ্যা ভ্যালিডেট করা
আমরা new
এবং execute
-এর প্যারামিটারগুলির সাথে কিছুই করছি না। আসুন এই ফাংশনগুলির বডিগুলিকে আমাদের ইচ্ছামতো আচরণ দিয়ে ইমপ্লিমেন্ট করি। শুরু করতে, আসুন new
সম্পর্কে চিন্তা করি। এর আগে আমরা size
প্যারামিটারের জন্য একটি আনসাইনড টাইপ বেছে নিয়েছিলাম কারণ নেগেটিভ সংখ্যক থ্রেড সহ একটি পুলের কোনো মানে হয় না। যাইহোক, শূন্য থ্রেড সহ একটি পুলেরও কোনো মানে হয় না, তবুও শূন্য একটি সম্পূর্ণ বৈধ usize
। আমরা কোড যোগ করব যাতে size
শূন্যের চেয়ে বড় কিনা তা পরীক্ষা করার জন্য আমরা একটি ThreadPool
ইন্সট্যান্স রিটার্ন করার আগে এবং assert!
ম্যাক্রো ব্যবহার করে শূন্য পেলে প্রোগ্রামটি প্যানিক করে, যেমনটি লিস্টিং ২১-১৩-তে দেখানো হয়েছে।
pub struct ThreadPool;
impl ThreadPool {
/// Create a new ThreadPool.
///
/// The size is the number of threads in the pool.
///
/// # Panics
///
/// The `new` function will panic if the size is zero.
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
ThreadPool
}
// --snip--
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
}
}
আমরা এখানে যেভাবে assert!
ম্যাক্রো যোগ করেছি, তার পরিবর্তে, আমরা new
কে build
-এ পরিবর্তন করতে পারতাম এবং একটি Result
রিটার্ন করতে পারতাম, যেমনটি আমরা I/O প্রোজেক্টে লিস্টিং ১২-৯-এ Config::build
-এর সাথে করেছি। কিন্তু আমরা এই ক্ষেত্রে সিদ্ধান্ত নিয়েছি যে কোনো থ্রেড ছাড়াই একটি থ্রেড পুল তৈরি করার চেষ্টা একটি পুনরুদ্ধার অযোগ্য এরর হওয়া উচিত। আপনি যদি উচ্চাকাঙ্ক্ষী হন, তাহলে new
ফাংশনের সাথে তুলনা করার জন্য নিম্নলিখিত সিগনেচার সহ build
নামে একটি ফাংশন লেখার চেষ্টা করুন:
pub fn build(size: usize) -> Result<ThreadPool, PoolCreationError> {
থ্রেড স্টোর করার জন্য জায়গা তৈরি করা
এখন যেহেতু পুলটিতে স্টোর করার জন্য আমাদের কাছে বৈধ সংখ্যক থ্রেড আছে, তাই আমরা সেই থ্রেডগুলি তৈরি করতে পারি এবং স্ট্রাক্টটি রিটার্ন করার আগে ThreadPool
স্ট্রাক্টে সেগুলি স্টোর করতে পারি। কিন্তু আমরা কীভাবে একটি থ্রেড "স্টোর" করব? আসুন thread::spawn
-এর সিগনেচারটি আবার দেখি:
pub fn spawn<F, T>(f: F) -> JoinHandle<T>
where
F: FnOnce() -> T,
F: Send + 'static,
T: Send + 'static,
spawn
ফাংশনটি একটি JoinHandle<T>
রিটার্ন করে, যেখানে T
হল সেই টাইপ যা ক্লোজারটি রিটার্ন করে। আসুন JoinHandle
ব্যবহার করার চেষ্টা করি এবং দেখি কী হয়। আমাদের ক্ষেত্রে, থ্রেড পুলে আমরা যে ক্লোজারগুলি পাস করছি সেগুলি কানেকশন হ্যান্ডেল করবে এবং কিছুই রিটার্ন করবে না, তাই T
হবে ইউনিট টাইপ ()
।
লিস্টিং ২১-১৪-এর কোডটি কম্পাইল হবে কিন্তু এখনও কোনো থ্রেড তৈরি করে না। আমরা ThreadPool
-এর সংজ্ঞা পরিবর্তন করে thread::JoinHandle<()>
ইন্সট্যান্সের একটি ভেক্টর রেখেছি, size
ক্যাপাসিটি সহ ভেক্টরটিকে ইনিশিয়ালাইজ করেছি, একটি for
লুপ সেট আপ করেছি যা থ্রেড তৈরি করার জন্য কিছু কোড চালাবে এবং সেগুলি ধারণকারী একটি ThreadPool
ইন্সট্যান্স রিটার্ন করেছি।
use std::thread;
pub struct ThreadPool {
threads: Vec<thread::JoinHandle<()>>,
}
impl ThreadPool {
// --snip--
/// Create a new ThreadPool.
///
/// The size is the number of threads in the pool.
///
/// # Panics
///
/// The `new` function will panic if the size is zero.
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let mut threads = Vec::with_capacity(size);
for _ in 0..size {
// create some threads and store them in the vector
}
ThreadPool { threads }
}
// --snip--
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
}
}
আমরা লাইব্রেরি ক্রেটে std::thread
কে স্কোপে এনেছি কারণ আমরা ThreadPool
-এর ভেক্টরের আইটেমগুলির টাইপ হিসাবে thread::JoinHandle
ব্যবহার করছি।
একবার একটি বৈধ সাইজ পাওয়া গেলে, আমাদের ThreadPool
একটি নতুন ভেক্টর তৈরি করে যা size
সংখ্যক আইটেম ধারণ করতে পারে। with_capacity
ফাংশনটি Vec::new
-এর মতোই কাজ করে, কিন্তু একটি গুরুত্বপূর্ণ পার্থক্য সহ: এটি ভেক্টরে আগে থেকে জায়গা বরাদ্দ করে। যেহেতু আমরা জানি যে আমাদের ভেক্টরে size
সংখ্যক এলিমেন্ট স্টোর করতে হবে, তাই আগে থেকে এই অ্যালোকেশন করা Vec::new
ব্যবহার করার চেয়ে কিছুটা বেশি কার্যকর, যা এলিমেন্ট যোগ করার সাথে সাথে নিজে থেকেই রিসাইজ হয়।
আপনি যখন আবার cargo check
চালাবেন, তখন এটি সফল হওয়া উচিত।
ThreadPool
থেকে একটি থ্রেডে কোড পাঠানোর জন্য দায়িত্বশীল একটি Worker
স্ট্রাক্ট
আমরা লিস্টিং ২১-১৪-তে থ্রেড তৈরির বিষয়ে for
লুপে একটি কমেন্ট রেখেছিলাম। এখানে, আমরা দেখব কিভাবে আমরা আসলে থ্রেড তৈরি করি। স্ট্যান্ডার্ড লাইব্রেরি থ্রেড তৈরি করার উপায় হিসাবে thread::spawn
সরবরাহ করে এবং thread::spawn
আশা করে যে থ্রেডটি তৈরি হওয়ার সাথে সাথেই চালানোর জন্য কিছু কোড পাবে। যাইহোক, আমাদের ক্ষেত্রে, আমরা থ্রেডগুলি তৈরি করতে চাই এবং তাদের অপেক্ষা করাতে চাই সেই কোডের জন্য যা আমরা পরে পাঠাব। থ্রেডের স্ট্যান্ডার্ড লাইব্রেরির ইমপ্লিমেন্টেশনে এটি করার কোনো উপায় নেই; আমাদের এটি ম্যানুয়ালি ইমপ্লিমেন্ট করতে হবে।
আমরা ThreadPool
এবং থ্রেডগুলির মধ্যে একটি নতুন ডেটা স্ট্রাকচার উপস্থাপন করে এই আচরণটি ইমপ্লিমেন্ট করব যা এই নতুন আচরণটি পরিচালনা করবে। আমরা এই ডেটা স্ট্রাকচারটিকে Worker বলব, যা পুলিং ইমপ্লিমেন্টেশনে একটি সাধারণ শব্দ। Worker
সেই কোডটি তুলে নেয় যা চালানো দরকার এবং Worker এর থ্রেডে কোডটি চালায়।
একটি রেস্তোরাঁর রান্নাঘরে কাজ করা লোকেদের কথা ভাবুন: কর্মীরা গ্রাহকদের কাছ থেকে অর্ডার আসার জন্য অপেক্ষা করে এবং তারপর সেই অর্ডারগুলি নেওয়া এবং সেগুলি পূরণ করার জন্য তারা দায়ী থাকে।
থ্রেড পুলে JoinHandle<()>
ইন্সট্যান্সের একটি ভেক্টর সংরক্ষণ করার পরিবর্তে, আমরা Worker
স্ট্রাক্টের ইন্সট্যান্স সংরক্ষণ করব। প্রতিটি Worker
একটি একক JoinHandle<()>
ইন্সট্যান্স সংরক্ষণ করবে। তারপর আমরা Worker
-এ একটি মেথড ইমপ্লিমেন্ট করব যা চালানোর জন্য কোডের একটি ক্লোজার নেবে এবং এক্সিকিউশনের জন্য ইতিমধ্যে চলমান থ্রেডে পাঠাবে। লগিং বা ডিবাগিং করার সময় আমরা পুলের বিভিন্ন Worker
ইন্সট্যান্সের মধ্যে পার্থক্য করার জন্য প্রতিটি Worker
-কে একটি id
দেব।
এখানে নতুন প্রক্রিয়াটি রয়েছে যা ঘটবে যখন আমরা একটি ThreadPool
তৈরি করব। আমরা এইভাবে Worker
সেট আপ করার পরে থ্রেডে ক্লোজার পাঠানোর কোডটি ইমপ্লিমেন্ট করব:
- একটি
Worker
স্ট্রাক্ট সংজ্ঞায়িত করুন যা একটিid
এবং একটিJoinHandle<()>
ধারণ করে। Worker
ইন্সট্যান্সের একটি ভেক্টর ধারণ করতেThreadPool
পরিবর্তন করুন।- একটি
Worker::new
ফাংশন সংজ্ঞায়িত করুন যা একটিid
নম্বর নেয় এবং একটিWorker
ইন্সট্যান্স রিটার্ন করে যাid
এবং একটি খালি ক্লোজার দিয়ে স্পন করা থ্রেড ধারণ করে। ThreadPool::new
-তে, একটিid
জেনারেট করতেfor
লুপ কাউন্টার ব্যবহার করুন, সেইid
দিয়ে একটি নতুনWorker
তৈরি করুন এবং ভেক্টরে worker-কে স্টোর করুন।
আপনি যদি চ্যালেঞ্জের জন্য প্রস্তুত হন, তাহলে লিস্টিং ২১-১৫-এর কোড দেখার আগে নিজে থেকে এই পরিবর্তনগুলি ইমপ্লিমেন্ট করার চেষ্টা করুন।
প্রস্তুত? পূর্ববর্তী পরিবর্তনগুলি করার একটি উপায় সহ লিস্টিং ২১-১৫ এখানে রয়েছে।
use std::thread;
pub struct ThreadPool {
workers: Vec<Worker>,
}
impl ThreadPool {
// --snip--
/// Create a new ThreadPool.
///
/// The size is the number of threads in the pool.
///
/// # Panics
///
/// The `new` function will panic if the size is zero.
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id));
}
ThreadPool { workers }
}
// --snip--
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
}
}
struct Worker {
id: usize,
thread: thread::JoinHandle<()>,
}
impl Worker {
fn new(id: usize) -> Worker {
let thread = thread::spawn(|| {});
Worker { id, thread }
}
}
আমরা ThreadPool
-এ ফিল্ডের নাম threads
থেকে workers
-এ পরিবর্তন করেছি কারণ এটি এখন JoinHandle<()>
ইন্সট্যান্সের পরিবর্তে Worker
ইন্সট্যান্স ধারণ করছে। আমরা Worker::new
-তে আর্গুমেন্ট হিসাবে for
লুপের কাউন্টার ব্যবহার করি এবং আমরা প্রতিটি নতুন Worker
কে workers
নামের ভেক্টরে সংরক্ষণ করি।
বাহ্যিক কোড (যেমন src/main.rs-এ আমাদের সার্ভার) ThreadPool
-এর মধ্যে একটি Worker
স্ট্রাক্ট ব্যবহার সম্পর্কিত ইমপ্লিমেন্টেশনের বিবরণ জানার প্রয়োজন নেই, তাই আমরা Worker
স্ট্রাক্ট এবং এর new
ফাংশনকে প্রাইভেট করি। Worker::new
ফাংশনটি আমরা যে id
দিই সেটি ব্যবহার করে এবং একটি খালি ক্লোজার ব্যবহার করে একটি নতুন থ্রেড স্পন করে তৈরি করা একটি JoinHandle<()>
ইন্সট্যান্স সংরক্ষণ করে।
দ্রষ্টব্য: যদি অপারেটিং সিস্টেম পর্যাপ্ত সিস্টেম রিসোর্সের অভাবে একটি থ্রেড তৈরি করতে না পারে, তাহলে
thread::spawn
প্যানিক করবে। এটি আমাদের পুরো সার্ভারকে প্যানিক করে তুলবে, যদিও কিছু থ্রেড তৈরি করা সফল হতে পারে। সরলতার খাতিরে, এই আচরণটি ঠিক আছে, কিন্তু একটি প্রোডাকশন থ্রেড পুল ইমপ্লিমেন্টেশনে, আপনি সম্ভবতstd::thread::Builder
এবং এরspawn
মেথড ব্যবহার করতে চাইবেন যাResult
রিটার্ন করে।
এই কোডটি কম্পাইল হবে এবং ThreadPool::new
-তে আর্গুমেন্ট হিসাবে নির্দিষ্ট করা Worker
ইন্সট্যান্সের সংখ্যা স্টোর করবে। কিন্তু আমরা এখনও execute
-এ পাওয়া ক্লোজারটি প্রক্রিয়া করছি না। আসুন দেখি কিভাবে এটি করতে হয়।
চ্যানেলগুলির মাধ্যমে থ্রেডগুলিতে অনুরোধ পাঠানো
আমরা যে পরবর্তী সমস্যাটির সমাধান করব তা হল thread::spawn
-কে দেওয়া ক্লোজারগুলি কিছুই করে না। বর্তমানে, আমরা execute
মেথডে যে ক্লোজারটি এক্সিকিউট করতে চাই সেটি পাই। কিন্তু ThreadPool
তৈরির সময় প্রতিটি Worker
তৈরি করার সময় আমাদের thread::spawn
-কে চালানোর জন্য একটি ক্লোজার দিতে হবে।
আমরা চাই যে Worker
স্ট্রাক্টগুলি যা আমরা এইমাত্র তৈরি করেছি সেগুলি ThreadPool
-এ থাকা একটি কিউ থেকে চালানোর জন্য কোড ফেচ করুক এবং সেই কোডটি চালানোর জন্য তার থ্রেডে পাঠাক।
ষোড়শ অধ্যায়ে আমরা যে চ্যানেলগুলি সম্পর্কে শিখেছি—দুটি থ্রেডের মধ্যে যোগাযোগের একটি সহজ উপায়—এই ব্যবহারের ক্ষেত্রে উপযুক্ত হবে। আমরা কাজের কিউ হিসাবে কাজ করার জন্য একটি চ্যানেল ব্যবহার করব, এবং execute
ThreadPool
থেকে Worker
ইন্সট্যান্সে একটি কাজ পাঠাবে, যা কাজটি তার থ্রেডে পাঠাবে। এখানে পরিকল্পনাটি রয়েছে:
ThreadPool
একটি চ্যানেল তৈরি করবে এবং সেন্ডারকে ধরে রাখবে।- প্রতিটি
Worker
রিসিভারকে ধরে রাখবে। - আমরা একটি নতুন
Job
স্ট্রাক্ট তৈরি করব যা চ্যানেলটিতে পাঠাতে চাওয়া ক্লোজারগুলিকে ধারণ করবে। execute
মেথডটি যে কাজটি এক্সিকিউট করতে চায় সেটি সেন্ডারের মাধ্যমে পাঠাবে।- তার থ্রেডে,
Worker
তার রিসিভারের উপর লুপ করবে এবং প্রাপ্ত যেকোনো কাজের ক্লোজার এক্সিকিউট করবে।
আসুন ThreadPool::new
-তে একটি চ্যানেল তৈরি করে এবং ThreadPool
ইন্সট্যান্সে সেন্ডারটিকে ধরে রেখে শুরু করি, যেমনটি লিস্টিং ২১-১৬-তে দেখানো হয়েছে। Job
স্ট্রাক্ট আপাতত কিছুই ধারণ করে না তবে এটি চ্যানেলে পাঠানো আইটেমটির টাইপ হবে।
use std::{sync::mpsc, thread};
pub struct ThreadPool {
workers: Vec<Worker>,
sender: mpsc::Sender<Job>,
}
struct Job;
impl ThreadPool {
// --snip--
/// Create a new ThreadPool.
///
/// The size is the number of threads in the pool.
///
/// # Panics
///
/// The `new` function will panic if the size is zero.
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id));
}
ThreadPool { workers, sender }
}
// --snip--
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
}
}
struct Worker {
id: usize,
thread: thread::JoinHandle<()>,
}
impl Worker {
fn new(id: usize) -> Worker {
let thread = thread::spawn(|| {});
Worker { id, thread }
}
}
ThreadPool::new
-তে, আমরা আমাদের নতুন চ্যানেল তৈরি করি এবং পুলটিকে সেন্ডার ধরে রাখতে দিই। এটি সফলভাবে কম্পাইল হবে।
আসুন থ্রেড পুল চ্যানেল তৈরি করার সাথে সাথে প্রতিটি Worker
-এ চ্যানেলের একটি রিসিভার পাস করার চেষ্টা করি। আমরা জানি যে আমরা Worker
ইন্সট্যান্সগুলি যে থ্রেড স্পন করে তাতে রিসিভারটি ব্যবহার করতে চাই, তাই আমরা ক্লোজারে receiver
প্যারামিটারটি রেফারেন্স করব। লিস্টিং ২১-১৭-এর কোডটি এখনও পুরোপুরি কম্পাইল হবে না।
use std::{sync::mpsc, thread};
pub struct ThreadPool {
workers: Vec<Worker>,
sender: mpsc::Sender<Job>,
}
struct Job;
impl ThreadPool {
// --snip--
/// Create a new ThreadPool.
///
/// The size is the number of threads in the pool.
///
/// # Panics
///
/// The `new` function will panic if the size is zero.
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id, receiver));
}
ThreadPool { workers, sender }
}
// --snip--
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
}
}
// --snip--
struct Worker {
id: usize,
thread: thread::JoinHandle<()>,
}
impl Worker {
fn new(id: usize, receiver: mpsc::Receiver<Job>) -> Worker {
let thread = thread::spawn(|| {
receiver;
});
Worker { id, thread }
}
}
আমরা কিছু ছোট এবং সহজবোধ্য পরিবর্তন করেছি: আমরা Worker::new
-তে রিসিভারটি পাস করি এবং তারপর আমরা এটি ক্লোজারের ভিতরে ব্যবহার করি।
যখন আমরা এই কোডটি চেক করার চেষ্টা করি, তখন আমরা এই এররটি পাই:
$ cargo check
Checking hello v0.1.0 (file:///projects/hello)
error[E0382]: use of moved value: `receiver`
--> src/lib.rs:26:42
|
21 | let (sender, receiver) = mpsc::channel();
| -------- move occurs because `receiver` has type `std::sync::mpsc::Receiver<Job>`, which does not implement the `Copy` trait
...
25 | for id in 0..size {
| ----------------- inside of this loop
26 | workers.push(Worker::new(id, receiver));
| ^^^^^^^^ value moved here, in previous iteration of loop
|
note: consider changing this parameter type in method `new` to borrow instead if owning the value isn't necessary
--> src/lib.rs:47:33
|
47 | fn new(id: usize, receiver: mpsc::Receiver<Job>) -> Worker {
| --- in this method ^^^^^^^^^^^^^^^^^^^ this parameter takes ownership of the value
help: consider moving the expression out of the loop so it is only moved once
|
25 ~ let mut value = Worker::new(id, receiver);
26 ~ for id in 0..size {
27 ~ workers.push(value);
|
For more information about this error, try `rustc --explain E0382`.
error: could not compile `hello` (lib) due to 1 previous error
কোডটি একাধিক Worker
ইন্সট্যান্সে receiver
পাস করার চেষ্টা করছে। এটি কাজ করবে না, যেমনটি আপনি ষোড়শ অধ্যায় থেকে মনে করবেন: Rust যে চ্যানেল ইমপ্লিমেন্টেশন সরবরাহ করে তা হল মাল্টিপল প্রোডিউসার, সিঙ্গেল কনজিউমার। এর মানে হল আমরা এই কোডটি ঠিক করার জন্য চ্যানেলের কনজিউমিং প্রান্তটি ক্লোন করতে পারি না। আমরা একাধিক কনজিউমারের কাছে একাধিকবার একটি মেসেজ পাঠাতে চাই না; আমরা একাধিক Worker
ইন্সট্যান্স সহ মেসেজের একটি তালিকা চাই যাতে প্রতিটি মেসেজ একবার প্রক্রিয়া করা হয়।
অতিরিক্তভাবে, চ্যানেল কিউ থেকে একটি কাজ নেওয়া receiver
-কে মিউটেট করে, তাই থ্রেডগুলির receiver
শেয়ার এবং মডিফাই করার একটি নিরাপদ উপায় প্রয়োজন; অন্যথায়, আমরা রেস কন্ডিশন পেতে পারি (যেমনটি ষোড়শ অধ্যায়ে আলোচনা করা হয়েছে)।
ষোড়শ অধ্যায়ে আলোচিত থ্রেড-নিরাপদ স্মার্ট পয়েন্টারগুলির কথা মনে করুন: একাধিক থ্রেড জুড়ে ওনারশিপ শেয়ার করতে এবং থ্রেডগুলিকে ভ্যালু মিউটেট করার অনুমতি দেওয়ার জন্য, আমাদের Arc<Mutex<T>>
ব্যবহার করতে হবে। Arc
টাইপ একাধিক Worker
ইন্সট্যান্সকে রিসিভারের মালিক হতে দেবে এবং Mutex
নিশ্চিত করবে যে একবারে শুধুমাত্র একটি Worker
রিসিভার থেকে একটি কাজ পায়। লিস্টিং ২১-১৮ আমাদের যে পরিবর্তনগুলি করতে হবে তা দেখায়।
use std::{
sync::{Arc, Mutex, mpsc},
thread,
};
// --snip--
pub struct ThreadPool {
workers: Vec<Worker>,
sender: mpsc::Sender<Job>,
}
struct Job;
impl ThreadPool {
// --snip--
/// Create a new ThreadPool.
///
/// The size is the number of threads in the pool.
///
/// # Panics
///
/// The `new` function will panic if the size is zero.
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let receiver = Arc::new(Mutex::new(receiver));
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id, Arc::clone(&receiver)));
}
ThreadPool { workers, sender }
}
// --snip--
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
}
}
// --snip--
struct Worker {
id: usize,
thread: thread::JoinHandle<()>,
}
impl Worker {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
// --snip--
let thread = thread::spawn(|| {
receiver;
});
Worker { id, thread }
}
}
ThreadPool::new
-তে, আমরা রিসিভারটিকে একটি Arc
এবং একটি Mutex
-এ রাখি। প্রতিটি নতুন Worker
-এর জন্য, আমরা Arc
ক্লোন করি যাতে রেফারেন্স কাউন্ট বাড়ে, যাতে Worker
ইন্সট্যান্সগুলি রিসিভারের ওনারশিপ শেয়ার করতে পারে।
এই পরিবর্তনগুলির সাথে, কোড কম্পাইল হয়! আমরা সেখানে পৌঁছে যাচ্ছি!
execute
মেথড ইমপ্লিমেন্ট করা
আসুন অবশেষে ThreadPool
-এ execute
মেথডটি ইমপ্লিমেন্ট করি। আমরা execute
যে ধরনের ক্লোজার পায় সেটি ধারণ করে এমন একটি ট্রেইট অবজেক্টের জন্য Job
-কে একটি স্ট্রাক্ট থেকে একটি টাইপ অ্যালিয়াসে পরিবর্তন করব। যেমনটি বিংশ অধ্যায়ের “টাইপ অ্যালিয়াস সহ টাইপ সিনোনিম তৈরি করা” তে আলোচনা করা হয়েছে, টাইপ অ্যালিয়াসগুলি আমাদের ব্যবহারের সুবিধার জন্য দীর্ঘ টাইপগুলিকে ছোট করার অনুমতি দেয়। লিস্টিং ২১-১৯ দেখুন।
use std::{
sync::{Arc, Mutex, mpsc},
thread,
};
pub struct ThreadPool {
workers: Vec<Worker>,
sender: mpsc::Sender<Job>,
}
// --snip--
type Job = Box<dyn FnOnce() + Send + 'static>;
impl ThreadPool {
// --snip--
/// Create a new ThreadPool.
///
/// The size is the number of threads in the pool.
///
/// # Panics
///
/// The `new` function will panic if the size is zero.
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let receiver = Arc::new(Mutex::new(receiver));
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id, Arc::clone(&receiver)));
}
ThreadPool { workers, sender }
}
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
let job = Box::new(f);
self.sender.send(job).unwrap();
}
}
// --snip--
struct Worker {
id: usize,
thread: thread::JoinHandle<()>,
}
impl Worker {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
let thread = thread::spawn(|| {
receiver;
});
Worker { id, thread }
}
}
execute
-এ পাওয়া ক্লোজারটি ব্যবহার করে একটি নতুন Job
ইন্সট্যান্স তৈরি করার পরে, আমরা সেই কাজটি চ্যানেলের পাঠানোর প্রান্তে পাঠাই। পাঠানোর ক্ষেত্রে ব্যর্থ হলে আমরা send
-এ unwrap
কল করছি। উদাহরণস্বরূপ, যদি আমরা আমাদের সমস্ত থ্রেড এক্সিকিউট করা বন্ধ করে দিই, তাহলে এটি ঘটতে পারে, যার অর্থ রিসিভিং প্রান্তটি নতুন মেসেজ গ্রহণ করা বন্ধ করে দিয়েছে। এই মুহূর্তে, আমরা আমাদের থ্রেডগুলিকে এক্সিকিউট করা বন্ধ করতে পারি না: পুলটি যতদিন বিদ্যমান থাকে ততদিন আমাদের থ্রেডগুলি এক্সিকিউট করতে থাকে। আমরা unwrap
ব্যবহার করার কারণ হল আমরা জানি যে ব্যর্থতার ঘটনা ঘটবে না, কিন্তু কম্পাইলার তা জানে না।
কিন্তু আমরা এখনও পুরোপুরি শেষ করিনি! Worker
-এ, thread::spawn
-কে পাস করা আমাদের ক্লোজারটি এখনও চ্যানেলের রিসিভিং প্রান্তটিকে রেফারেন্স করে। পরিবর্তে, আমাদের ক্লোজারটিকে চিরতরে লুপ করতে হবে, চ্যানেলের রিসিভিং প্রান্তটিকে একটি কাজের জন্য জিজ্ঞাসা করতে হবে এবং যখন এটি একটি কাজ পাবে তখন সেটি চালাতে হবে। আসুন Worker::new
-তে লিস্টিং ২১-২০-তে দেখানো পরিবর্তনটি করি।
use std::{
sync::{Arc, Mutex, mpsc},
thread,
};
pub struct ThreadPool {
workers: Vec<Worker>,
sender: mpsc::Sender<Job>,
}
type Job = Box<dyn FnOnce() + Send + 'static>;
impl ThreadPool {
/// Create a new ThreadPool.
///
/// The size is the number of threads in the pool.
///
/// # Panics
///
/// The `new` function will panic if the size is zero.
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let receiver = Arc::new(Mutex::new(receiver));
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id, Arc::clone(&receiver)));
}
ThreadPool { workers, sender }
}
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
let job = Box::new(f);
self.sender.send(job).unwrap();
}
}
struct Worker {
id: usize,
thread: thread::JoinHandle<()>,
}
// --snip--
impl Worker {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
let thread = thread::spawn(move || {
loop {
let job = receiver.lock().unwrap().recv().unwrap();
println!("Worker {id} got a job; executing.");
job();
}
});
Worker { id, thread }
}
}
এখানে, আমরা প্রথমে মিউটেক্স অর্জন করতে receiver
-এ lock
কল করি এবং তারপর কোনো এরর-এ প্যানিক করার জন্য unwrap
কল করি। যদি মিউটেক্সটি একটি পয়জনড অবস্থায় থাকে তবে লক অর্জন করা ব্যর্থ হতে পারে, যা ঘটতে পারে যদি অন্য কোনো থ্রেড লকটি রিলিজ করার পরিবর্তে লকটি ধরে রাখার সময় প্যানিক করে। এই পরিস্থিতিতে, এই থ্রেডটিকে প্যানিক করার জন্য unwrap
কল করা হল সঠিক কাজ। আপনার জন্য অর্থপূর্ণ একটি এরর মেসেজ সহ এই unwrap
কে একটি expect
-এ পরিবর্তন করতে পারেন।
যদি আমরা মিউটেক্সের উপর লক পাই, তাহলে আমরা চ্যানেল থেকে একটি Job
রিসিভ করার জন্য recv
কল করি। একটি ফাইনাল unwrap
এখানেও যেকোনো এরর অতিক্রম করে, যা ঘটতে পারে যদি সেন্ডার ধারণ করা থ্রেডটি বন্ধ হয়ে যায়, একইভাবে send
মেথড Err
রিটার্ন করে যদি রিসিভার বন্ধ হয়ে যায়।
recv
-তে কলটি ব্লক করে, তাই যদি এখনও কোনো কাজ না থাকে, তাহলে বর্তমান থ্রেডটি একটি কাজ উপলব্ধ না হওয়া পর্যন্ত অপেক্ষা করবে। Mutex<T>
নিশ্চিত করে যে একবারে শুধুমাত্র একটি Worker
থ্রেড একটি কাজের অনুরোধ করার চেষ্টা করছে।
আমাদের থ্রেড পুলটি এখন একটি কার্যকরী অবস্থায় রয়েছে! এটিকে একটি cargo run
দিন এবং কিছু অনুরোধ করুন:
$ cargo run
Compiling hello v.1.0 (file:///projects/hello)
warning: field `workers` is never read
--> src/lib.rs:7:5
|
6 | pub struct ThreadPool {
| ---------- field in this struct
7 | workers: Vec<Worker>,
| ^^^^^^^
|
= note: `#[warn(dead_code)]` on by default
warning: fields `id` and `thread` are never read
--> src/lib.rs:48:5
|
47 | struct Worker {
| ------ fields in this struct
48 | id: usize,
| ^^
49 | thread: thread::JoinHandle<()>,
| ^^^^^^
warning: `hello` (lib) generated 2 warnings
Finished `dev` profile [unoptimized + debuginfo] target(s) in 4.91s
Running `target/debug/hello`
Worker 0 got a job; executing.
Worker 2 got a job; executing.
Worker 1 got a job; executing.
Worker 3 got a job; executing.
Worker 0 got a job; executing.
Worker 2 got a job; executing.
Worker 1 got a job; executing.
Worker 3 got a job; executing.
Worker 0 got a job; executing.
Worker 2 got a job; executing.
সফল! আমাদের এখন একটি থ্রেড পুল রয়েছে যা অ্যাসিঙ্ক্রোনাসভাবে কানেকশন এক্সিকিউট করে। কখনও চারটির বেশি থ্রেড তৈরি হয় না, তাই সার্ভার প্রচুর অনুরোধ পেলেও আমাদের সিস্টেম ওভারলোড হবে না। যদি আমরা /sleep-এ একটি অনুরোধ করি, তাহলে সার্ভার অন্য থ্রেড চালিয়ে অন্য অনুরোধগুলি পরিবেশন করতে সক্ষম হবে।
দ্রষ্টব্য: আপনি যদি একই সাথে একাধিক ব্রাউজার উইন্ডোতে /sleep খোলেন, তাহলে সেগুলি পাঁচ সেকেন্ডের ব্যবধানে একবারে একটি লোড হতে পারে। কিছু ওয়েব ব্রাউজার ক্যাশিংয়ের কারণে একই অনুরোধের একাধিক ইন্সট্যান্স ক্রমানুসারে এক্সিকিউট করে। এই সীমাবদ্ধতা আমাদের ওয়েব সার্ভারের কারণে নয়।
এখানে থামার এবং লিস্টিং 21-18, 21-19 এবং 21-20-এর কোডগুলি কীভাবে আলাদা হত তা বিবেচনা করার জন্য এটি একটি ভাল সময়, যদি আমরা কাজ করার জন্য ক্লোজারের পরিবর্তে ফিউচার ব্যবহার করতাম। কোন টাইপগুলি পরিবর্তন হবে? মেথড স্বাক্ষরগুলি কীভাবে আলাদা হবে, যদি আদৌ হয়? কোডের কোন অংশগুলি একই থাকবে?
17 এবং 18 অধ্যায়ে while let
লুপ সম্পর্কে জানার পরে, আপনি হয়ত ভাবছেন কেন আমরা লিস্টিং 21-21-এ দেখানো ওয়ার্কার থ্রেড কোডটি লিখিনি।
use std::{
sync::{Arc, Mutex, mpsc},
thread,
};
pub struct ThreadPool {
workers: Vec<Worker>,
sender: mpsc::Sender<Job>,
}
type Job = Box<dyn FnOnce() + Send + 'static>;
impl ThreadPool {
/// Create a new ThreadPool.
///
/// The size is the number of threads in the pool.
///
/// # Panics
///
/// The `new` function will panic if the size is zero.
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let receiver = Arc::new(Mutex::new(receiver));
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id, Arc::clone(&receiver)));
}
ThreadPool { workers, sender }
}
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
let job = Box::new(f);
self.sender.send(job).unwrap();
}
}
struct Worker {
id: usize,
thread: thread::JoinHandle<()>,
}
// --snip--
impl Worker {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
let thread = thread::spawn(move || {
while let Ok(job) = receiver.lock().unwrap().recv() {
println!("Worker {id} got a job; executing.");
job();
}
});
Worker { id, thread }
}
}
এই কোডটি কম্পাইল এবং রান করে কিন্তু কাঙ্ক্ষিত থ্রেডিং আচরণ দেয় না: একটি স্লো অনুরোধ এখনও অন্যান্য অনুরোধগুলিকে প্রসেস হওয়ার জন্য অপেক্ষা করাবে। কারণটি কিছুটা সূক্ষ্ম: Mutex
স্ট্রাক্টের কোনো পাবলিক unlock
মেথড নেই কারণ লকের ওনারশিপ lock
মেথড রিটার্ন করা LockResult<MutexGuard<T>>
-এর মধ্যে MutexGuard<T>
-এর লাইফটাইমের উপর ভিত্তি করে। কম্পাইল করার সময়, বরো চেকার তখন এই নিয়মটি প্রয়োগ করতে পারে যে একটি Mutex
দ্বারা সুরক্ষিত একটি রিসোর্স অ্যাক্সেস করা যাবে না যদি না আমরা লকটি ধরে রাখি। যাইহোক, যদি আমরা MutexGuard<T>
-এর লাইফটাইম সম্পর্কে সচেতন না হই, তাহলে এই ইমপ্লিমেন্টেশনটি ইচ্ছার চেয়ে বেশি সময় ধরে লক ধরে রাখতে পারে।
লিস্টিং 21-20-এর কোডটি let job = receiver.lock().unwrap().recv().unwrap();
ব্যবহার করে কাজ করে কারণ let
-এর সাথে, সমান চিহ্নের ডান পাশের এক্সপ্রেশনে ব্যবহৃত যেকোনো টেম্পোরারি ভ্যালু let
স্টেটমেন্ট শেষ হওয়ার সাথে সাথেই ড্রপ হয়ে যায়। যাইহোক, while let
(এবং if let
এবং match
) অ্যাসোসিয়েটেড ব্লকের শেষ না হওয়া পর্যন্ত টেম্পোরারি ভ্যালু ড্রপ করে না। লিস্টিং 21-21-এ, job()
-তে কলের সময়কাল ধরে লকটি ধরে রাখা হয়, যার অর্থ অন্য Worker
ইন্সট্যান্সগুলি কাজ পেতে পারে না।