থ্রেডের মধ্যে ডেটা স্থানান্তরের জন্য মেসেজ পাসিং ব্যবহার করা
নিরাপদ কনকারেন্সি নিশ্চিত করার একটি ক্রমবর্ধমান জনপ্রিয় পদ্ধতি হলো মেসেজ পাসিং (message passing), যেখানে থ্রেড বা অ্যাক্টররা ডেটা সহ মেসেজ একে অপরকে পাঠিয়ে যোগাযোগ করে। গো ল্যাঙ্গুয়েজের ডকুমেন্টেশন থেকে একটি স্লোগান দিয়ে ধারণাটি এখানে দেওয়া হলো: "মেমরি শেয়ার করে যোগাযোগ করবেন না; পরিবর্তে, যোগাযোগের মাধ্যমে মেমরি শেয়ার করুন।"
মেসেজ-সেন্ডিং কনকারেন্সি সম্পন্ন করার জন্য, Rust-এর স্ট্যান্ডার্ড লাইব্রেরি চ্যানেলগুলোর একটি ইমপ্লিমেন্টেশন সরবরাহ করে। একটি চ্যানেল (channel) হলো একটি সাধারণ প্রোগ্রামিং ধারণা যার মাধ্যমে ডেটা এক থ্রেড থেকে অন্য থ্রেডে পাঠানো হয়।
আপনি প্রোগ্রামিংয়ে একটি চ্যানেলকে জলের একটি দিকনির্দেশক চ্যানেলের মতো কল্পনা করতে পারেন, যেমন একটি স্রোত বা নদী। আপনি যদি একটি রাবারের হাঁসের মতো কিছু একটি নদীতে রাখেন, তবে এটি স্রোতের সাথে জলপথের শেষ পর্যন্ত ভ্রমণ করবে।
একটি চ্যানেলের দুটি অর্ধেক থাকে: একটি ট্রান্সমিটার এবং একটি রিসিভার। ট্রান্সমিটারের অর্ধেকটি হলো উজানের অবস্থান যেখানে আপনি রাবারের হাঁসটিকে নদীতে রাখেন, এবং রিসিভারের অর্ধেকটি হলো যেখানে রাবারের হাঁসটি স্রোতের শেষে পৌঁছায়। আপনার কোডের একটি অংশ আপনি যে ডেটা পাঠাতে চান তা দিয়ে ট্রান্সমিটারের মেথড কল করে, এবং অন্য একটি অংশ আগত মেসেজের জন্য রিসিভিং প্রান্তটি পরীক্ষা করে। যদি ট্রান্সমিটার বা রিসিভারের অর্ধেকটি ড্রপ করা হয় তবে একটি চ্যানেলকে বন্ধ (closed) বলা হয়।
এখানে, আমরা এমন একটি প্রোগ্রামের দিকে কাজ করব যেখানে একটি থ্রেড ভ্যালু তৈরি করে এবং সেগুলোকে একটি চ্যানেলের মাধ্যমে পাঠায়, এবং অন্য একটি থ্রেড সেই ভ্যালুগুলো গ্রহণ করে এবং সেগুলো প্রিন্ট করে। আমরা ফিচারটি চিত্রিত করার জন্য একটি চ্যানেল ব্যবহার করে থ্রেডগুলোর মধ্যে সহজ ভ্যালু পাঠাব। একবার আপনি এই কৌশলের সাথে পরিচিত হয়ে গেলে, আপনি একে অপরের সাথে যোগাযোগ করার প্রয়োজন এমন যেকোনো থ্রেডের জন্য চ্যানেল ব্যবহার করতে পারেন, যেমন একটি চ্যাট সিস্টেম বা এমন একটি সিস্টেম যেখানে অনেক থ্রেড একটি গণনার অংশ সম্পাদন করে এবং ফলাফলগুলো একত্রিত করে এমন একটি থ্রেডে অংশগুলো পাঠায়।
প্রথমে, তালিকা ১৬-৬-এ, আমরা একটি চ্যানেল তৈরি করব কিন্তু এটি দিয়ে কিছুই করব না। মনে রাখবেন যে এটি এখনও কম্পাইল হবে না কারণ Rust বলতে পারে না যে আমরা চ্যানেলের মাধ্যমে কোন ধরনের ভ্যালু পাঠাতে চাই।
use std::sync::mpsc;
fn main() {
let (tx, rx) = mpsc::channel();
}
আমরা mpsc::channel
ফাংশন ব্যবহার করে একটি নতুন চ্যানেল তৈরি করি; mpsc
মানে হলো multiple producer, single consumer (একাধিক উৎপাদক, একক ভোক্তা)। সংক্ষেপে, Rust-এর স্ট্যান্ডার্ড লাইব্রেরি যেভাবে চ্যানেলগুলো প্রয়োগ করে তার মানে হলো একটি চ্যানেলের একাধিক প্রেরণকারী প্রান্ত থাকতে পারে যা ভ্যালু তৈরি করে কিন্তু শুধুমাত্র একটি গ্রহণকারী প্রান্ত থাকে যা সেই ভ্যালুগুলো গ্রহণ করে। কল্পনা করুন একাধিক স্রোত একসাথে একটি বড় নদীতে প্রবাহিত হচ্ছে: যেকোনো স্রোতে পাঠানো সবকিছু শেষে একটি নদীতেই শেষ হবে। আমরা আপাতত একটি একক প্রডিউসার দিয়ে শুরু করব, কিন্তু এই উদাহরণটি কাজ করলে আমরা একাধিক প্রডিউসার যোগ করব।
mpsc::channel
ফাংশনটি একটি টাপল (tuple) রিটার্ন করে, যার প্রথম উপাদানটি হলো প্রেরক প্রান্ত—ট্রান্সমিটার—এবং দ্বিতীয় উপাদানটি হলো প্রাপক প্রান্ত—রিসিভার। tx
এবং rx
সংক্ষেপণগুলো ঐতিহ্যগতভাবে অনেক ক্ষেত্রে যথাক্রমে ট্রান্সমিটার (transmitter) এবং রিসিভার (receiver) এর জন্য ব্যবহৃত হয়, তাই আমরা প্রতিটি প্রান্ত নির্দেশ করার জন্য আমাদের ভ্যারিয়েবলগুলোর নাম সেভাবেই রাখি। আমরা একটি let
স্টেটমেন্ট একটি প্যাটার্নসহ ব্যবহার করছি যা টাপলটিকে ডিস্ট্রাকচার (destructures) করে; আমরা অধ্যায় ১৯-এ let
স্টেটমেন্টে প্যাটার্নের ব্যবহার এবং ডিস্ট্রাকচারিং নিয়ে আলোচনা করব। আপাতত, জেনে রাখুন যে mpsc::channel
দ্বারা রিটার্ন করা টাপলের অংশগুলো বের করার জন্য এই পদ্ধতিতে একটি let
স্টেটমেন্ট ব্যবহার করা একটি সুবিধাজনক উপায়।
আসুন প্রেরক প্রান্তটিকে একটি স্পনড থ্রেডে منتقل করি এবং এটি একটি স্ট্রিং পাঠাক যাতে স্পনড থ্রেডটি মূল থ্রেডের সাথে যোগাযোগ করে, যেমনটি তালিকা ১৬-৭-এ দেখানো হয়েছে। এটি নদীর উজানে একটি রাবারের হাঁস রাখার মতো বা এক থ্রেড থেকে অন্য থ্রেডে একটি চ্যাট বার্তা পাঠানোর মতো।
use std::sync::mpsc; use std::thread; fn main() { let (tx, rx) = mpsc::channel(); thread::spawn(move || { let val = String::from("hi"); tx.send(val).unwrap(); }); }
আবারও, আমরা একটি নতুন থ্রেড তৈরি করতে thread::spawn
ব্যবহার করছি এবং তারপর tx
-কে ক্লোজারে منتقل করতে move
ব্যবহার করছি যাতে স্পনড থ্রেড tx
-এর মালিক হয়। স্পনড থ্রেডটিকে চ্যানেলের মাধ্যমে বার্তা পাঠাতে সক্ষম হওয়ার জন্য ট্রান্সমিটারের মালিক হতে হবে।
ট্রান্সমিটারের একটি send
মেথড আছে যা আমরা যে ভ্যালুটি পাঠাতে চাই তা নেয়। send
মেথডটি একটি Result<T, E>
টাইপ রিটার্ন করে, তাই যদি রিসিভারটি ইতিমধ্যে ড্রপ করা হয়ে থাকে এবং একটি ভ্যালু পাঠানোর কোনো জায়গা না থাকে, তবে সেন্ড অপারেশনটি একটি এরর রিটার্ন করবে। এই উদাহরণে, আমরা একটি এররের ক্ষেত্রে প্যানিক করার জন্য unwrap
কল করছি। কিন্তু একটি বাস্তব অ্যাপ্লিকেশনে, আমরা এটি সঠিকভাবে হ্যান্ডেল করব: সঠিক এরর হ্যান্ডলিংয়ের কৌশলগুলো পর্যালোচনা করতে অধ্যায় ৯-এ ফিরে যান।
তালিকা ১৬-৮-এ, আমরা মূল থ্রেডে রিসিভার থেকে ভ্যালুটি পাব। এটি নদীর শেষে জল থেকে রাবারের হাঁসটি উদ্ধার করার মতো বা একটি চ্যাট বার্তা গ্রহণ করার মতো।
use std::sync::mpsc; use std::thread; fn main() { let (tx, rx) = mpsc::channel(); thread::spawn(move || { let val = String::from("hi"); tx.send(val).unwrap(); }); let received = rx.recv().unwrap(); println!("Got: {received}"); }
রিসিভারের দুটি দরকারী মেথড আছে: recv
এবং try_recv
। আমরা recv
ব্যবহার করছি, যা receive এর সংক্ষিপ্ত রূপ, যা মূল থ্রেডের এক্সিকিউশন ব্লক করবে এবং চ্যানেলের মাধ্যমে একটি ভ্যালু পাঠানো না হওয়া পর্যন্ত অপেক্ষা করবে। একবার একটি ভ্যালু পাঠানো হলে, recv
এটি একটি Result<T, E>
-এ রিটার্ন করবে। যখন ট্রান্সমিটার বন্ধ হয়ে যায়, recv
একটি এরর রিটার্ন করে সংকেত দেবে যে আর কোনো ভ্যালু আসছে না।
try_recv
মেথডটি ব্লক করে না, বরং অবিলম্বে একটি Result<T, E>
রিটার্ন করবে: একটি Ok
ভ্যালু যাতে একটি বার্তা থাকে যদি একটি উপলব্ধ থাকে এবং একটি Err
ভ্যালু যদি এই সময়ে কোনো বার্তা না থাকে। try_recv
ব্যবহার করা দরকারী যদি এই থ্রেডের বার্তাগুলোর জন্য অপেক্ষা করার সময় অন্য কাজ করার থাকে: আমরা একটি লুপ লিখতে পারি যা প্রতি এতক্ষণ try_recv
কল করে, একটি বার্তা উপলব্ধ থাকলে তা হ্যান্ডেল করে, এবং অন্যথায় অন্য কাজ করে কিছুক্ষণ যতক্ষণ না আবার পরীক্ষা করা হয়।
আমরা এই উদাহরণে সরলতার জন্য recv
ব্যবহার করেছি; আমাদের মূল থ্রেডের জন্য বার্তাগুলোর জন্য অপেক্ষা করা ছাড়া অন্য কোনো কাজ নেই, তাই মূল থ্রেডটিকে ব্লক করা উপযুক্ত।
যখন আমরা তালিকা ১৬-৮-এর কোডটি চালাই, আমরা মূল থ্রেড থেকে প্রিন্ট করা ভ্যালুটি দেখতে পাব:
Got: hi
পারফেক্ট!
চ্যানেল এবং মালিকানা স্থানান্তর (Channels and Ownership Transference)
মালিকানার নিয়মগুলো মেসেজ পাঠানোর ক্ষেত্রে একটি গুরুত্বপূর্ণ ভূমিকা পালন করে কারণ এগুলো আপনাকে নিরাপদ, কনকারেন্ট কোড লিখতে সাহায্য করে। আপনার Rust প্রোগ্রামজুড়ে মালিকানা নিয়ে চিন্তা করার সুবিধা হলো কনকারেন্ট প্রোগ্রামিংয়ে ভুল প্রতিরোধ করা। আসুন একটি পরীক্ষা করি যাতে চ্যানেল এবং মালিকানা কীভাবে সমস্যা প্রতিরোধ করতে একসাথে কাজ করে তা দেখানো যায়: আমরা চ্যানেলের মাধ্যমে পাঠানোর পরে স্পনড থ্রেডে একটি val
ভ্যালু ব্যবহার করার চেষ্টা করব। তালিকা ১৬-৯-এর কোডটি কম্পাইল করার চেষ্টা করুন যাতে দেখা যায় কেন এই কোডটি অনুমোদিত নয়।
use std::sync::mpsc;
use std::thread;
fn main() {
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
let val = String::from("hi");
tx.send(val).unwrap();
println!("val is {val}");
});
let received = rx.recv().unwrap();
println!("Got: {received}");
}
এখানে, আমরা tx.send
-এর মাধ্যমে চ্যানেলে পাঠানোর পর val
প্রিন্ট করার চেষ্টা করছি। এটি অনুমোদন করা একটি খারাপ ধারণা হবে: একবার ভ্যালুটি অন্য থ্রেডে পাঠানো হয়ে গেলে, সেই থ্রেডটি আমরা আবার ভ্যালুটি ব্যবহার করার চেষ্টা করার আগে এটিকে পরিবর্তন বা ড্রপ করতে পারে। সম্ভবত, অন্য থ্রেডের পরিবর্তনগুলো অসংগত বা অস্তিত্বহীন ডেটার কারণে ত্রুটি বা অপ্রত্যাশিত ফলাফলের কারণ হতে পারে। যাইহোক, যদি আমরা তালিকা ১৬-৯-এর কোডটি কম্পাইল করার চেষ্টা করি তবে Rust আমাদের একটি এরর দেয়:
$ cargo run
Compiling message-passing v0.1.0 (file:///projects/message-passing)
error[E0382]: borrow of moved value: `val`
--> src/main.rs:10:26
|
8 | let val = String::from("hi");
| --- move occurs because `val` has type `String`, which does not implement the `Copy` trait
9 | tx.send(val).unwrap();
| --- value moved here
10 | println!("val is {val}");
| ^^^^^ value borrowed here after move
|
= note: this error originates in the macro `$crate::format_args_nl` which comes from the expansion of the macro `println` (in Nightly builds, run with -Z macro-backtrace for more info)
For more information about this error, try `rustc --explain E0382`.
error: could not compile `message-passing` (bin "message-passing") due to 1 previous error
আমাদের কনকারেন্সি ভুল একটি কম্পাইল-টাইম এররের কারণ হয়েছে। send
ফাংশনটি তার প্যারামিটারের মালিকানা নেয়, এবং যখন ভ্যালুটি সরানো হয় তখন রিসিভার এটির মালিকানা নেয়। এটি আমাদের পাঠানোর পরে ঘটনাক্রমে আবার ভ্যালুটি ব্যবহার করা থেকে বিরত রাখে; মালিকানা সিস্টেম পরীক্ষা করে যে সবকিছু ঠিক আছে।
একাধিক ভ্যালু পাঠানো এবং রিসিভারকে অপেক্ষা করতে দেখা
তালিকা ১৬-৮-এর কোডটি কম্পাইল এবং রান হয়েছে, কিন্তু এটি স্পষ্টভাবে দেখায়নি যে দুটি পৃথক থ্রেড চ্যানেলের মাধ্যমে একে অপরের সাথে কথা বলছে।
তালিকা ১৬-১০-এ আমরা কিছু পরিবর্তন করেছি যা প্রমাণ করবে যে তালিকা ১৬-৮-এর কোডটি কনকারেন্টলি চলছে: স্পনড থ্রেডটি এখন একাধিক মেসেজ পাঠাবে এবং প্রতিটি মেসেজের মধ্যে এক সেকেন্ডের জন্য বিরতি দেবে।
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
fn main() {
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
let vals = vec![
String::from("hi"),
String::from("from"),
String::from("the"),
String::from("thread"),
];
for val in vals {
tx.send(val).unwrap();
thread::sleep(Duration::from_secs(1));
}
});
for received in rx {
println!("Got: {received}");
}
}
এইবার, স্পনড থ্রেডে স্ট্রিংগুলোর একটি ভেক্টর আছে যা আমরা মূল থ্রেডে পাঠাতে চাই। আমরা সেগুলোর উপর ইটারেট করি, প্রতিটি পৃথকভাবে পাঠাই, এবং প্রতিটির মধ্যে এক সেকেন্ডের Duration
ভ্যালু দিয়ে thread::sleep
ফাংশন কল করে বিরতি দিই।
মূল থ্রেডে, আমরা আর স্পষ্টভাবে recv
ফাংশন কল করছি না: পরিবর্তে, আমরা rx
-কে একটি ইটারেটর হিসাবে ব্যবহার করছি। প্রতিটি প্রাপ্ত ভ্যালুর জন্য, আমরা এটি প্রিন্ট করছি। যখন চ্যানেলটি বন্ধ হয়ে যাবে, ইটারেশন শেষ হয়ে যাবে।
তালিকা ১৬-১০-এর কোডটি চালানোর সময়, আপনার প্রতিটি লাইনের মধ্যে এক-সেকেন্ডের বিরতিসহ নিম্নলিখিত আউটপুটটি দেখতে পাওয়া উচিত:
Got: hi
Got: from
Got: the
Got: thread
যেহেতু আমাদের মূল থ্রেডের for
লুপে কোনো কোড নেই যা বিরতি বা বিলম্ব করে, আমরা বলতে পারি যে মূল থ্রেডটি স্পনড থ্রেড থেকে ভ্যালু গ্রহণ করার জন্য অপেক্ষা করছে।
ট্রান্সমিটার ক্লোন করে একাধিক প্রডিউসার তৈরি করা
আগে আমরা উল্লেখ করেছি যে mpsc
হলো multiple producer, single consumer এর সংক্ষিপ্ত রূপ। আসুন mpsc
-কে কাজে লাগাই এবং তালিকা ১৬-১০-এর কোডটি প্রসারিত করে একাধিক থ্রেড তৈরি করি যা সবাই একই রিসিভারে ভ্যালু পাঠায়। আমরা ট্রান্সমিটার ক্লোন করে এটি করতে পারি, যেমনটি তালিকা ১৬-১১-এ দেখানো হয়েছে।
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
fn main() {
// --snip--
let (tx, rx) = mpsc::channel();
let tx1 = tx.clone();
thread::spawn(move || {
let vals = vec![
String::from("hi"),
String::from("from"),
String::from("the"),
String::from("thread"),
];
for val in vals {
tx1.send(val).unwrap();
thread::sleep(Duration::from_secs(1));
}
});
thread::spawn(move || {
let vals = vec![
String::from("more"),
String::from("messages"),
String::from("for"),
String::from("you"),
];
for val in vals {
tx.send(val).unwrap();
thread::sleep(Duration::from_secs(1));
}
});
for received in rx {
println!("Got: {received}");
}
// --snip--
}
এইবার, আমরা প্রথম স্পনড থ্রেড তৈরি করার আগে, আমরা ট্রান্সমিটারের উপর clone
কল করি। এটি আমাদের একটি নতুন ট্রান্সমিটার দেবে যা আমরা প্রথম স্পনড থ্রেডে পাস করতে পারি। আমরা আসল ট্রান্সমিটারটি একটি দ্বিতীয় স্পনড থ্রেডে পাস করি। এটি আমাদের দুটি থ্রেড দেয়, প্রতিটি একটি রিসিভারে ভিন্ন ভিন্ন মেসেজ পাঠায়।
যখন আপনি কোডটি চালান, আপনার আউটপুটটি এইরকম কিছু দেখতে হবে:
Got: hi
Got: more
Got: from
Got: messages
Got: for
Got: the
Got: thread
Got: you
আপনার সিস্টেমের উপর নির্ভর করে আপনি ভ্যালুগুলো অন্য ক্রমে দেখতে পারেন। এটিই কনকারেন্সি কে আকর্ষণীয় এবং কঠিন করে তোলে। আপনি যদি thread::sleep
নিয়ে পরীক্ষা করেন, বিভিন্ন থ্রেডে বিভিন্ন ভ্যালু দেন, প্রতিটি রান আরও নন-ডিটারমিনিস্টিক হবে এবং প্রতিবার ভিন্ন আউটপুট তৈরি করবে।
এখন যেহেতু আমরা দেখেছি চ্যানেলগুলো কীভাবে কাজ করে, আসুন কনকারেন্সির একটি ভিন্ন পদ্ধতি দেখি।