Async দিয়ে কনকারেন্সি প্রয়োগ
এই বিভাগে, আমরা চ্যাপ্টার ১৬-তে থ্রেড (thread) দিয়ে সমাধান করা কিছু কনকারেন্সি চ্যালেঞ্জের জন্য async প্রয়োগ করব। যেহেতু আমরা সেখানে অনেক মূল ধারণা নিয়ে ইতিমধ্যে আলোচনা করেছি, তাই এই বিভাগে আমরা থ্রেড এবং future-এর মধ্যে কী কী পার্থক্য রয়েছে তার উপর মনোযোগ দেব।
অনেক ক্ষেত্রে, async ব্যবহার করে কনকারেন্সির সাথে কাজ করার জন্য API-গুলো থ্রেড ব্যবহারের জন্য থাকা API-গুলোর মতোই। অন্য ক্ষেত্রে, সেগুলো বেশ ভিন্ন হয়ে যায়। এমনকি যখন API-গুলো থ্রেড এবং async-এর মধ্যে দেখতে একই রকম মনে হয়, তখনও তাদের আচরণ প্রায়শই ভিন্ন হয়—এবং তাদের পারফরম্যান্স বৈশিষ্ট্যগুলি প্রায় সবসময়ই ভিন্ন থাকে।
spawn_task
দিয়ে নতুন টাস্ক তৈরি করা
Creating a New Thread with Spawn-এ আমরা প্রথম যে অপারেশনটি নিয়ে কাজ করেছিলাম তা হলো দুটি পৃথক থ্রেডে গণনা করা। আসুন async ব্যবহার করে একই কাজ করি। trpl
crate একটি spawn_task
ফাংশন সরবরাহ করে যা দেখতে thread::spawn
API-এর মতোই, এবং একটি sleep
ফাংশন যা thread::sleep
API-এর একটি async সংস্করণ। আমরা এই দুটিকে একসাথে ব্যবহার করে গণনার উদাহরণটি বাস্তবায়ন করতে পারি, যেমনটি লিস্টিং ১৭-৬-এ দেখানো হয়েছে।
extern crate trpl; // required for mdbook test use std::time::Duration; fn main() { trpl::run(async { trpl::spawn_task(async { for i in 1..10 { println!("hi number {i} from the first task!"); trpl::sleep(Duration::from_millis(500)).await; } }); for i in 1..5 { println!("hi number {i} from the second task!"); trpl::sleep(Duration::from_millis(500)).await; } }); }
আমাদের সূচনা বিন্দু হিসাবে, আমরা আমাদের main
ফাংশনটি trpl::run
দিয়ে সেট আপ করি যাতে আমাদের টপ-লেভেল ফাংশনটি async হতে পারে।
দ্রষ্টব্য: এই অধ্যায়ের এই পয়েন্ট থেকে, প্রতিটি উদাহরণে
main
-এtrpl::run
সহ এই একই র্যাপিং কোড অন্তর্ভুক্ত থাকবে, তাই আমরা প্রায়শই এটি এড়িয়ে যাব যেমনটি আমরাmain
-এর ক্ষেত্রে করি। আপনার কোডে এটি অন্তর্ভুক্ত করতে ভুলবেন না!
তারপর আমরা সেই ব্লকের মধ্যে দুটি লুপ লিখি, প্রতিটিতে একটি trpl::sleep
কল রয়েছে, যা পরবর্তী বার্তা পাঠানোর আগে আধা সেকেন্ড (৫০০ মিলিসেকেন্ড) অপেক্ষা করে। আমরা একটি লুপ trpl::spawn_task
-এর বডিতে এবং অন্যটি একটি টপ-লেভেল for
লুপে রাখি। আমরা sleep
কলগুলোর পরে একটি await
যোগ করি।
এই কোডটি থ্রেড-ভিত্তিক ইমপ্লিমেন্টেশনের মতোই আচরণ করে—যার মধ্যে এই ঘটনাটিও অন্তর্ভুক্ত যে আপনি যখন এটি চালাবেন তখন আপনার নিজের টার্মিনালে বার্তাগুলি একটি ভিন্ন ক্রমে উপস্থিত হতে পারে:
hi number 1 from the second task!
hi number 1 from the first task!
hi number 2 from the first task!
hi number 2 from the second task!
hi number 3 from the first task!
hi number 3 from the second task!
hi number 4 from the first task!
hi number 4 from the second task!
hi number 5 from the first task!```
এই সংস্করণটি প্রধান async ব্লকের বডিতে `for` লুপ শেষ হওয়ার সাথে সাথেই থেমে যায়, কারণ `spawn_task` দ্বারা তৈরি করা টাস্কটি `main` ফাংশন শেষ হলে বন্ধ হয়ে যায়। আপনি যদি এটিকে টাস্কের সমাপ্তি পর্যন্ত চালাতে চান, তবে প্রথম টাস্কটি সম্পূর্ণ হওয়ার জন্য অপেক্ষা করতে আপনাকে একটি জয়েন হ্যান্ডেল ব্যবহার করতে হবে। থ্রেডের সাথে, আমরা থ্রেডটি চলা শেষ না হওয়া পর্যন্ত "ব্লক" করার জন্য `join` মেথড ব্যবহার করেছি। লিস্টিং ১৭-৭-এ, আমরা একই কাজ করার জন্য `await` ব্যবহার করতে পারি, কারণ টাস্ক হ্যান্ডেল নিজেই একটি future। এর `Output` টাইপ একটি `Result`, তাই আমরা এটিকে await করার পরে `unwrap` করি।
<Listing number="17-7" caption="একটি টাস্ককে সমাপ্তি পর্যন্ত চালানোর জন্য একটি জয়েন হ্যান্ডেলের সাথে `await` ব্যবহার করা" file-name="src/main.rs">
```rust
# extern crate trpl; // required for mdbook test
#
# use std::time::Duration;
#
# fn main() {
# trpl::run(async {
let handle = trpl::spawn_task(async {
for i in 1..10 {
println!("hi number {i} from the first task!");
trpl::sleep(Duration::from_millis(500)).await;
}
});
for i in 1..5 {
println!("hi number {i} from the second task!");
trpl::sleep(Duration::from_millis(500)).await;
}
handle.await.unwrap();
# });
# }
এই আপডেট করা সংস্করণটি উভয় লুপ শেষ না হওয়া পর্যন্ত চলে।
hi number 1 from the second task!
hi number 1 from the first task!
hi number 2 from the first task!
hi number 2 from the second task!
hi number 3 from the first task!
hi number 3 from the second task!
hi number 4 from the first task!
hi number 4 from the second task!
hi number 5 from the first task!
hi number 6 from the first task!
hi number 7 from the first task!
hi number 8 from the first task!
hi number 9 from the first task!
এখন পর্যন্ত, মনে হচ্ছে async এবং থ্রেড আমাদের একই মৌলিক ফলাফল দেয়, শুধু ভিন্ন সিনট্যাক্স দিয়ে: জয়েন হ্যান্ডেলে join
কল করার পরিবর্তে await
ব্যবহার করা, এবং sleep
কলগুলিকে await করা।
বড় পার্থক্য হলো এই কাজটি করার জন্য আমাদের অন্য একটি অপারেটিং সিস্টেম থ্রেড তৈরি করার প্রয়োজন হয়নি। আসলে, আমাদের এখানে একটি টাস্কও তৈরি করার প্রয়োজন নেই। যেহেতু async ব্লকগুলি নামহীন future-এ কম্পাইল হয়, আমরা প্রতিটি লুপকে একটি async ব্লকে রাখতে পারি এবং রানটাইমকে trpl::join
ফাংশন ব্যবহার করে সেগুলিকে সম্পূর্ণ করতে দিতে পারি।
Waiting for All Threads to Finishing Using join
Handles বিভাগে, আমরা দেখিয়েছিলাম কীভাবে std::thread::spawn
কল করার সময় রিটার্ন করা JoinHandle
টাইপের উপর join
মেথড ব্যবহার করতে হয়। trpl::join
ফাংশনটি একই রকম, কিন্তু future-এর জন্য। যখন আপনি এটিকে দুটি future দেন, তখন এটি একটি নতুন future তৈরি করে যার আউটপুট হলো একটি টাপল (tuple) যা আপনার পাস করা প্রতিটি future-এর আউটপুট ধারণ করে যখন তারা উভয়ই সম্পূর্ণ হয়। সুতরাং, লিস্টিং ১৭-৮-এ, আমরা fut1
এবং fut2
উভয়ই শেষ হওয়ার জন্য অপেক্ষা করতে trpl::join
ব্যবহার করি। আমরা fut1
এবং fut2
-কে await করি না, বরং trpl::join
দ্বারা উৎপাদিত নতুন future-কে await করি। আমরা আউটপুট উপেক্ষা করি, কারণ এটি কেবল দুটি ইউনিট মান ধারণকারী একটি টাপল।
extern crate trpl; // required for mdbook test use std::time::Duration; fn main() { trpl::run(async { let fut1 = async { for i in 1..10 { println!("hi number {i} from the first task!"); trpl::sleep(Duration::from_millis(500)).await; } }; let fut2 = async { for i in 1..5 { println!("hi number {i} from the second task!"); trpl::sleep(Duration::from_millis(500)).await; } }; trpl::join(fut1, fut2).await; }); }
যখন আমরা এটি চালাই, আমরা দেখি উভয় future-ই সম্পূর্ণভাবে চলে:
hi number 1 from the first task!
hi number 1 from the second task!
hi number 2 from the first task!
hi number 2 from the second task!
hi number 3 from the first task!
hi number 3 from the second task!
hi number 4 from the first task!
hi number 4 from the second task!
hi number 5 from the first task!
hi number 6 from the first task!
hi number 7 from the first task!
hi number 8 from the first task!
hi number 9 from the first task!
এখন, আপনি প্রতিবার ঠিক একই ক্রম দেখতে পাবেন, যা আমরা থ্রেডের সাথে যা দেখেছিলাম তার থেকে খুব ভিন্ন। এর কারণ হলো trpl::join
ফাংশনটি ফেয়ার (fair), যার অর্থ এটি প্রতিটি future-কে সমানভাবে পরীক্ষা করে, তাদের মধ্যে পর্যায়ক্রমে পরিবর্তন করে, এবং অন্যটি প্রস্তুত থাকলে একটিকে এগিয়ে যেতে দেয় না। থ্রেডের সাথে, অপারেটিং সিস্টেম সিদ্ধান্ত নেয় কোন থ্রেডটি পরীক্ষা করতে হবে এবং এটিকে কতক্ষণ চালাতে দিতে হবে। async রাস্টের সাথে, রানটাইম সিদ্ধান্ত নেয় কোন টাস্কটি পরীক্ষা করতে হবে। (বাস্তবে, বিশদ বিবরণগুলি জটিল হয়ে যায় কারণ একটি async রানটাইম কনকারেন্সি পরিচালনার অংশ হিসাবে পর্দার আড়ালে অপারেটিং সিস্টেম থ্রেড ব্যবহার করতে পারে, তাই ফেয়ারনেস নিশ্চিত করা একটি রানটাইমের জন্য আরও বেশি কাজ হতে পারে—তবে এটি এখনও সম্ভব!) রানটাইমগুলিকে যেকোনো প্রদত্ত অপারেশনের জন্য ফেয়ারনেসের গ্যারান্টি দিতে হয় না, এবং তারা প্রায়শই আপনাকে ফেয়ারনেস চান কি না তা বেছে নিতে বিভিন্ন API অফার করে।
ফিউচার await করার এই ভিন্নতাগুলো চেষ্টা করে দেখুন এবং দেখুন তারা কী করে:
- যেকোনো একটি বা উভয় লুপের চারপাশ থেকে async ব্লকটি সরিয়ে ফেলুন।
- প্রতিটি async ব্লক সংজ্ঞায়িত করার সাথে সাথেই await করুন।
- কেবলমাত্র প্রথম লুপটিকে একটি async ব্লকে র্যাপ করুন, এবং দ্বিতীয় লুপের বডির পরে ফলাফলস্বরূপ future-টিকে await করুন।
একটি অতিরিক্ত চ্যালেঞ্জের জন্য, দেখুন আপনি প্রতিটি ক্ষেত্রে কোড চালানোর আগে আউটপুট কী হবে তা বের করতে পারেন কিনা!
মেসেজ পাসিং ব্যবহার করে দুটি টাস্কে গণনা করা
Future-এর মধ্যে ডেটা শেয়ার করাও পরিচিত মনে হবে: আমরা আবার মেসেজ পাসিং ব্যবহার করব, কিন্তু এবার async সংস্করণ টাইপ এবং ফাংশনের সাথে। আমরা Using Message Passing to Transfer Data Between Threads-এ যে পথ নিয়েছিলাম তার থেকে কিছুটা ভিন্ন পথ নেব যাতে থ্রেড-ভিত্তিক এবং future-ভিত্তিক কনকারেন্সির মধ্যে কিছু মূল পার্থক্য তুলে ধরা যায়। লিস্টিং ১৭-৯-এ, আমরা কেবল একটি async ব্লক দিয়ে শুরু করব—একটি পৃথক থ্রেড তৈরি করার মতো করে একটি পৃথক টাস্ক তৈরি না করে।
extern crate trpl; // required for mdbook test fn main() { trpl::run(async { let (tx, mut rx) = trpl::channel(); let val = String::from("hi"); tx.send(val).unwrap(); let received = rx.recv().await.unwrap(); println!("received '{received}'"); }); }
এখানে, আমরা trpl::channel
ব্যবহার করি, যা চ্যাপ্টার ১৬-তে আমরা থ্রেডের সাথে ব্যবহার করা মাল্টিপল-প্রডিউসার, সিঙ্গেল-কনজিউমার চ্যানেল API-এর একটি async সংস্করণ। API-এর async সংস্করণটি থ্রেড-ভিত্তিক সংস্করণ থেকে সামান্য ভিন্ন: এটি একটি অপরিবর্তনীয় রিসিভার rx
-এর পরিবর্তে একটি পরিবর্তনযোগ্য রিসিভার ব্যবহার করে, এবং এর recv
মেথড সরাসরি মান তৈরি করার পরিবর্তে একটি future তৈরি করে যা আমাদের await করতে হবে। এখন আমরা সেন্ডার থেকে রিসিভারে মেসেজ পাঠাতে পারি। লক্ষ্য করুন যে আমাদের একটি পৃথক থ্রেড বা এমনকি একটি টাস্কও তৈরি করতে হবে না; আমাদের কেবল rx.recv
কলটি await করতে হবে।
std::mpsc::channel
-এর সিঙ্ক্রোনাস Receiver::recv
মেথডটি একটি মেসেজ না পাওয়া পর্যন্ত ব্লক করে। trpl::Receiver::recv
মেথডটি তা করে না, কারণ এটি async। ব্লক করার পরিবর্তে, এটি রানটাইমের কাছে নিয়ন্ত্রণ ফিরিয়ে দেয় যতক্ষণ না একটি মেসেজ পাওয়া যায় বা চ্যানেলের সেন্ড সাইড বন্ধ হয়ে যায়। এর বিপরীতে, আমরা send
কলটি await করি না, কারণ এটি ব্লক করে না। এটির প্রয়োজন নেই, কারণ আমরা যে চ্যানেলে পাঠাচ্ছি তা আনবাউন্ডেড (unbounded)।
দ্রষ্টব্য: যেহেতু এই সমস্ত async কোড একটি
trpl::run
কলের মধ্যে একটি async ব্লকে চলে, তাই এর মধ্যে সবকিছুই ব্লকিং এড়াতে পারে। যাইহোক, এর বাইরের কোডটিrun
ফাংশন রিটার্ন করার উপর ব্লক করবে। এটাইtrpl::run
ফাংশনের মূল উদ্দেশ্য: এটি আপনাকে বেছে নিতে দেয় কোথায় কিছু async কোডের সেটের উপর ব্লক করতে হবে, এবং এইভাবে সিঙ্ক এবং async কোডের মধ্যে কোথায় রূপান্তর করতে হবে। বেশিরভাগ async রানটাইমে,run
-এর নাম আসলে ঠিক এই কারণেইblock_on
।
এই উদাহরণ সম্পর্কে দুটি জিনিস লক্ষ্য করুন। প্রথমত, বার্তাটি সাথে সাথেই পৌঁছে যাবে। দ্বিতীয়ত, যদিও আমরা এখানে একটি future ব্যবহার করি, এখনও কোনো কনকারেন্সি নেই। লিস্টিংয়ের সবকিছু ক্রমানুসারে ঘটে, ঠিক যেমনটি হতো যদি কোনো future জড়িত না থাকত।
আসুন প্রথম অংশটি মোকাবেলা করি একটি সিরিজের বার্তা পাঠিয়ে এবং তাদের মধ্যে sleep
করে, যেমনটি লিস্টিং ১৭-১০-এ দেখানো হয়েছে।
extern crate trpl; // required for mdbook test
use std::time::Duration;
fn main() {
trpl::run(async {
let (tx, mut rx) = trpl::channel();
let vals = vec![
String::from("hi"),
String::from("from"),
String::from("the"),
String::from("future"),
];
for val in vals {
tx.send(val).unwrap();
trpl::sleep(Duration::from_millis(500)).await;
}
while let Some(value) = rx.recv().await {
println!("received '{value}'");
}
});
}
বার্তা পাঠানোর পাশাপাশি, আমাদের সেগুলি গ্রহণ করতে হবে। এই ক্ষেত্রে, যেহেতু আমরা জানি কতগুলি বার্তা আসছে, আমরা চারবার rx.recv().await
কল করে এটি ম্যানুয়ালি করতে পারতাম। তবে বাস্তব জগতে, আমরা সাধারণত কিছু অজানা সংখ্যক বার্তার জন্য অপেক্ষা করব, তাই আমাদের আর কোনো বার্তা নেই তা নির্ধারণ না করা পর্যন্ত অপেক্ষা করতে হবে।
লিস্টিং ১৬-১০-এ, আমরা একটি সিঙ্ক্রোনাস চ্যানেল থেকে প্রাপ্ত সমস্ত আইটেম প্রসেস করার জন্য একটি for
লুপ ব্যবহার করেছি। তবে রাস্টের কাছে এখনও আইটেমের একটি অ্যাসিঙ্ক্রোনাস সিরিজের উপর for
লুপ লেখার কোনো উপায় নেই, তাই আমাদের এমন একটি লুপ ব্যবহার করতে হবে যা আমরা আগে দেখিনি: while let
কন্ডিশনাল লুপ। এটি হলো if let
কনস্ট্রাক্টের লুপ সংস্করণ যা আমরা Concise Control Flow with if let
and let else
বিভাগে দেখেছি। লুপটি ততক্ষণ চলতে থাকবে যতক্ষণ এর নির্দিষ্ট করা প্যাটার্নটি মানের সাথে মিলতে থাকবে।
rx.recv
কল একটি future তৈরি করে, যা আমরা await করি। রানটাইম future-টিকে প্রস্তুত না হওয়া পর্যন্ত পজ (pause) করবে। একবার একটি বার্তা এসে গেলে, future-টি যতবার একটি বার্তা আসবে ততবার Some(message)
-এ রিজলভ হবে। যখন চ্যানেলটি বন্ধ হয়ে যাবে, কোনো বার্তা এসেছে কি না নির্বিশেষে, future-টি পরিবর্তে None
রিজলভ করবে এটি নির্দেশ করার জন্য যে আর কোনো মান নেই এবং তাই আমাদের পোলিং বন্ধ করা উচিত—অর্থাৎ, await করা বন্ধ করা উচিত।
while let
লুপ এই সবকিছুকে একত্রিত করে। যদি rx.recv().await
কল করার ফলাফল Some(message)
হয়, আমরা বার্তাটিতে অ্যাক্সেস পাই এবং আমরা এটি লুপের বডিতে ব্যবহার করতে পারি, যেমনটি আমরা if let
-এর সাথে করতে পারতাম। যদি ফলাফলটি None
হয়, লুপটি শেষ হয়ে যায়। প্রতিবার লুপটি সম্পূর্ণ হলে, এটি আবার await পয়েন্টে আঘাত করে, তাই রানটাইম এটিকে আবার পজ করে যতক্ষণ না অন্য একটি বার্তা আসে।
কোডটি এখন সফলভাবে সমস্ত বার্তা পাঠায় এবং গ্রহণ করে। দুর্ভাগ্যবশত, এখনও কয়েকটি সমস্যা রয়েছে। একটি হলো, বার্তাগুলি আধা-সেকেন্ডের ব্যবধানে আসে না। প্রোগ্রাম শুরু করার ২ সেকেন্ড (২,০০০ মিলিসেকেন্ড) পরে সেগুলি একবারে আসে। আরেকটি হলো, এই প্রোগ্রামটি কখনও শেষ হয় না! পরিবর্তে, এটি নতুন বার্তার জন্য চিরকাল অপেক্ষা করে। আপনাকে ctrl-c ব্যবহার করে এটি বন্ধ করতে হবে।
আসুন প্রথমে পরীক্ষা করে দেখি কেন বার্তাগুলি পুরো বিলম্বের পরে একবারে আসে, প্রতিটিটির মধ্যে বিলম্ব সহ আসার পরিবর্তে। একটি নির্দিষ্ট async ব্লকের মধ্যে, কোডে await
কিওয়ার্ডগুলি যে ক্রমে উপস্থিত হয়, প্রোগ্রাম চলার সময় সেগুলি সেই ক্রমেই কার্যকর হয়।
লিস্টিং ১৭-১০-এ কেবল একটি async ব্লক রয়েছে, তাই এর মধ্যে সবকিছু রৈখিকভাবে (linearly) চলে। এখনও কোনো কনকারেন্সি নেই। সমস্ত tx.send
কল ঘটে, যার মধ্যে সমস্ত trpl::sleep
কল এবং তাদের সংশ্লিষ্ট await পয়েন্টগুলি মিশে থাকে। শুধুমাত্র তারপরেই while let
লুপ recv
কলগুলির কোনো await
পয়েন্টের মধ্য দিয়ে যেতে পারে।
আমরা যে আচরণটি চাই তা পেতে, যেখানে প্রতিটি বার্তার মধ্যে sleep
বিলম্ব ঘটে, আমাদের tx
এবং rx
অপারেশনগুলিকে তাদের নিজস্ব async ব্লকে রাখতে হবে, যেমনটি লিস্টিং ১৭-১১-এ দেখানো হয়েছে। তারপরে রানটাইম trpl::join
ব্যবহার করে সেগুলির প্রতিটি আলাদাভাবে চালাতে পারে, ঠিক গণনার উদাহরণের মতো। আবারও, আমরা trpl::join
কল করার ফলাফল await করি, পৃথক future-গুলিকে নয়। যদি আমরা পৃথক future-গুলিকে ক্রমানুসারে await করতাম, আমরা কেবল একটি ক্রমিক প্রবাহে ফিরে আসতাম—ঠিক যা আমরা না করার চেষ্টা করছি।
extern crate trpl; // required for mdbook test
use std::time::Duration;
fn main() {
trpl::run(async {
let (tx, mut rx) = trpl::channel();
let tx_fut = async {
let vals = vec![
String::from("hi"),
String::from("from"),
String::from("the"),
String::from("future"),
];
for val in vals {
tx.send(val).unwrap();
trpl::sleep(Duration::from_millis(500)).await;
}
};
let rx_fut = async {
while let Some(value) = rx.recv().await {
println!("received '{value}'");
}
};
trpl::join(tx_fut, rx_fut).await;
});
}
লিস্টিং ১৭-১১-এর আপডেট করা কোড দিয়ে, বার্তাগুলি ২ সেকেন্ড পরে একবারে আসার পরিবর্তে ৫০০-মিলিসেকেন্ডের ব্যবধানে প্রিন্ট হয়।
প্রোগ্রামটি এখনও কখনও শেষ হয় না, কারণ while let
লুপটি trpl::join
-এর সাথে যেভাবে ইন্টারঅ্যাক্ট করে তার জন্য:
trpl::join
থেকে রিটার্ন করা future-টি তখনই সম্পূর্ণ হয় যখন এটিতে পাস করা উভয় future সম্পূর্ণ হয়।tx
future-টিvals
-এর শেষ বার্তাটি পাঠানোর পরে স্লিপ করা শেষ হলে সম্পূর্ণ হয়।rx
future-টিwhile let
লুপ শেষ না হওয়া পর্যন্ত সম্পূর্ণ হবে না।while let
লুপটিrx.recv
await করাNone
তৈরি না করা পর্যন্ত শেষ হবে না।rx.recv
await করা তখনইNone
রিটার্ন করবে যখন চ্যানেলের অন্য প্রান্তটি বন্ধ হয়ে যাবে।- চ্যানেলটি তখনই বন্ধ হবে যদি আমরা
rx.close
কল করি বা যখন সেন্ডার সাইড,tx
, ড্রপ (drop) হয়ে যায়। - আমরা কোথাও
rx.close
কল করি না, এবংtx
ড্রপ হবে না যতক্ষণ নাtrpl::run
-কে পাস করা সবচেয়ে বাইরের async ব্লকটি শেষ হয়। - ব্লকটি শেষ হতে পারে না কারণ এটি
trpl::join
সম্পূর্ণ হওয়ার উপর ব্লক হয়ে আছে, যা আমাদের এই তালিকার শীর্ষে ফিরিয়ে নিয়ে যায়।
আমরা rx.close
কল করে ম্যানুয়ালি rx
বন্ধ করতে পারতাম, কিন্তু এর খুব একটা মানে হয় না। একটি নির্বিচারে সংখ্যক বার্তা পরিচালনা করার পরে থামলে প্রোগ্রামটি বন্ধ হয়ে যাবে, কিন্তু আমরা বার্তা মিস করতে পারি। আমাদের নিশ্চিত করার জন্য অন্য কোনো উপায় প্রয়োজন যাতে tx
ফাংশনটির শেষ হওয়ার আগে ড্রপ হয়ে যায়।
এখন, আমরা যে async ব্লকে বার্তা পাঠাই তা কেবল tx
ধার (borrow) করে কারণ একটি বার্তা পাঠানোর জন্য মালিকানার (ownership) প্রয়োজন হয় না, কিন্তু যদি আমরা tx
-কে সেই async ব্লকের মধ্যে মুভ (move) করতে পারতাম, তবে সেই ব্লকটি শেষ হয়ে গেলে এটি ড্রপ হয়ে যেত। চ্যাপ্টার ১৩-এর Capturing References or Moving Ownership বিভাগে, আপনি শিখেছেন কীভাবে ক্লোজারের সাথে move
কিওয়ার্ড ব্যবহার করতে হয়, এবং, চ্যাপ্টার ১৬-এর Using move
Closures with Threads বিভাগে আলোচনা করা হয়েছে, থ্রেডের সাথে কাজ করার সময় আমাদের প্রায়শই ক্লোজারের মধ্যে ডেটা মুভ করতে হয়। একই মৌলিক গতিবিদ্যা async ব্লকগুলিতেও প্রযোজ্য, তাই move
কিওয়ার্ড async ব্লকগুলির সাথে ঠিক সেভাবেই কাজ করে যেমনটি ক্লোজারের সাথে করে।
লিস্টিং ১৭-১২-এ, আমরা বার্তা পাঠানোর জন্য ব্যবহৃত ব্লকটিকে async
থেকে async move
-এ পরিবর্তন করি। যখন আমরা কোডের এই সংস্করণটি চালাই, তখন শেষ বার্তাটি পাঠানো এবং গ্রহণ করার পরে এটি সুন্দরভাবে বন্ধ হয়ে যায়।
extern crate trpl; // required for mdbook test use std::time::Duration; fn main() { trpl::run(async { let (tx, mut rx) = trpl::channel(); let tx_fut = async move { let vals = vec![ String::from("hi"), String::from("from"), String::from("the"), String::from("future"), ]; for val in vals { tx.send(val).unwrap(); trpl::sleep(Duration::from_millis(500)).await; } }; let rx_fut = async { while let Some(value) = rx.recv().await { println!("received '{value}'"); } }; trpl::join(tx_fut, rx_fut).await; }); }
এই async চ্যানেলটিও একটি মাল্টিপল-প্রডিউসার চ্যানেল, তাই আমরা যদি একাধিক future থেকে বার্তা পাঠাতে চাই তবে tx
-এর উপর clone
কল করতে পারি, যেমনটি লিস্টিং ১৭-১৩-এ দেখানো হয়েছে।
extern crate trpl; // required for mdbook test use std::time::Duration; fn main() { trpl::run(async { let (tx, mut rx) = trpl::channel(); let tx1 = tx.clone(); let tx1_fut = async move { let vals = vec![ String::from("hi"), String::from("from"), String::from("the"), String::from("future"), ]; for val in vals { tx1.send(val).unwrap(); trpl::sleep(Duration::from_millis(500)).await; } }; let rx_fut = async { while let Some(value) = rx.recv().await { println!("received '{value}'"); } }; let tx_fut = async move { let vals = vec![ String::from("more"), String::from("messages"), String::from("for"), String::from("you"), ]; for val in vals { tx.send(val).unwrap(); trpl::sleep(Duration::from_millis(1500)).await; } }; trpl::join3(tx1_fut, tx_fut, rx_fut).await; }); }
প্রথমে, আমরা tx
ক্লোন করি, প্রথম async ব্লকের বাইরে tx1
তৈরি করি। আমরা tx1
-কে সেই ব্লকের মধ্যে মুভ করি ঠিক যেমনটি আমরা আগে tx
-এর সাথে করেছি। তারপরে, পরে, আমরা মূল tx
-কে একটি নতুন async ব্লকে মুভ করি, যেখানে আমরা সামান্য ধীর গতিতে আরও বার্তা পাঠাই। আমরা ঘটনাচক্রে এই নতুন async ব্লকটি বার্তা গ্রহণের জন্য async ব্লকের পরে রাখি, তবে এটি এর আগেও থাকতে পারত। মূল বিষয় হলো future-গুলি কোন ক্রমে await করা হয়, কোন ক্রমে সেগুলি তৈরি করা হয় তা নয়।
বার্তা পাঠানোর জন্য উভয় async ব্লককেই async move
ব্লক হতে হবে যাতে tx
এবং tx1
উভয়ই সেই ব্লকগুলি শেষ হলে ড্রপ হয়ে যায়। অন্যথায়, আমরা সেই একই অসীম লুপে ফিরে যাব যা দিয়ে আমরা শুরু করেছিলাম। অবশেষে, আমরা অতিরিক্ত future-টি পরিচালনা করার জন্য trpl::join
থেকে trpl::join3
-এ স্যুইচ করি।
এখন আমরা উভয় সেন্ডিং future থেকে সমস্ত বার্তা দেখতে পাই, এবং যেহেতু সেন্ডিং future-গুলি পাঠানোর পরে সামান্য ভিন্ন বিলম্ব ব্যবহার করে, বার্তাগুলিও সেই ভিন্ন ব্যবধানে গৃহীত হয়।
received 'hi'
received 'more'
received 'from'
received 'the'
received 'messages'
received 'future'
received 'for'
received 'you'
এটি একটি ভালো শুরু, কিন্তু এটি আমাদের কেবল কয়েকটি future-এর মধ্যে সীমাবদ্ধ রাখে: join
-এর সাথে দুটি, বা join3
-এর সাথে তিনটি। আসুন দেখি আমরা কীভাবে আরও future-এর সাথে কাজ করতে পারি।