যেকোনো সংখ্যক ফিউচারের সাথে কাজ করা
পূর্ববর্তী বিভাগে যখন আমরা দুটি ফিউচার থেকে তিনটি ফিউচারে স্যুইচ করেছি, তখন আমাদের join
থেকে join3
ব্যবহার করতে হয়েছিল। আমরা যতবার ফিউচারের সংখ্যা পরিবর্তন করব, ততবার একটি ভিন্ন ফাংশন কল করতে হলে তা বিরক্তিকর হতো। আনন্দের বিষয়, আমাদের কাছে join
-এর একটি ম্যাক্রো ফর্ম রয়েছে যেখানে আমরা ইচ্ছামত আর্গুমেন্ট পাস করতে পারি। এটি ফিউচারগুলোকে await করার কাজও নিজেই করে। সুতরাং, আমরা লিস্টিং 17-13 থেকে কোডটি join3
-এর পরিবর্তে join!
ব্যবহার করে পুনরায় লিখতে পারি, যেমনটি লিস্টিং 17-14-এ দেখানো হয়েছে।
extern crate trpl; // required for mdbook test use std::time::Duration; fn main() { trpl::run(async { let (tx, mut rx) = trpl::channel(); let tx1 = tx.clone(); let tx1_fut = async move { let vals = vec![ String::from("hi"), String::from("from"), String::from("the"), String::from("future"), ]; for val in vals { tx1.send(val).unwrap(); trpl::sleep(Duration::from_secs(1)).await; } }; let rx_fut = async { while let Some(value) = rx.recv().await { println!("received '{value}'"); } }; let tx_fut = async move { let vals = vec![ String::from("more"), String::from("messages"), String::from("for"), String::from("you"), ]; for val in vals { tx.send(val).unwrap(); trpl::sleep(Duration::from_secs(1)).await; } }; trpl::join!(tx1_fut, tx_fut, rx_fut); }); }
এটি অবশ্যই join
এবং join3
এবং join4
ইত্যাদির মধ্যে অদলবদল করার চেয়ে একটি উন্নতি! যাইহোক, এমনকি এই ম্যাক্রো ফর্মটি কেবল তখনই কাজ করে যখন আমরা আগে থেকে ফিউচারের সংখ্যা জানি। কিন্তু বাস্তব-জগতের রাস্ট কোডে, ফিউচারগুলোকে একটি কালেকশনে পুশ করা এবং তারপরে তাদের কিছু বা সমস্ত ফিউচার সম্পূর্ণ হওয়ার জন্য অপেক্ষা করা একটি সাধারণ প্যাটার্ন।
কোনো কালেকশনের সমস্ত ফিউচার পরীক্ষা করার জন্য, আমাদের সেগুলোর সবগুলোর উপর ইটারেট করতে হবে এবং জয়েন করতে হবে। trpl::join_all
ফাংশনটি যেকোনো টাইপ গ্রহণ করে যা Iterator
ট্রেইট ইমপ্লিমেন্ট করে, যা আপনি চ্যাপ্টার ১৩-এর The Iterator Trait and the next
Method-এ শিখেছেন, তাই এটি ঠিক কাজের জিনিস বলে মনে হচ্ছে। আসুন আমাদের ফিউচারগুলোকে একটি ভেক্টরে রাখি এবং join!
-কে join_all
দিয়ে প্রতিস্থাপন করার চেষ্টা করি যেমনটি লিস্টিং 17-15-এ দেখানো হয়েছে।
extern crate trpl; // required for mdbook test
use std::time::Duration;
fn main() {
trpl::run(async {
let (tx, mut rx) = trpl::channel();
let tx1 = tx.clone();
let tx1_fut = async move {
let vals = vec![
String::from("hi"),
String::from("from"),
String::from("the"),
String::from("future"),
];
for val in vals {
tx1.send(val).unwrap();
trpl::sleep(Duration::from_secs(1)).await;
}
};
let rx_fut = async {
while let Some(value) = rx.recv().await {
println!("received '{value}'");
}
};
let tx_fut = async move {
let vals = vec![
String::from("more"),
String::from("messages"),
String::from("for"),
String::from("you"),
];
for val in vals {
tx.send(val).unwrap();
trpl::sleep(Duration::from_secs(1)).await;
}
};
let futures = vec![tx1_fut, rx_fut, tx_fut];
trpl::join_all(futures).await;
});
}
দুর্ভাগ্যবশত, এই কোডটি কম্পাইল হয় না। পরিবর্তে, আমরা এই ত্রুটিটি পাই:
error[E0308]: mismatched types
--> src/main.rs:45:37
|
10 | let tx1_fut = async move {
| ---------- the expected `async` block
...
24 | let rx_fut = async {
| ----- the found `async` block
...
45 | let futures = vec![tx1_fut, rx_fut, tx_fut];
| ^^^^^^ expected `async` block, found a different `async` block
|
= note: expected `async` block `{async block@src/main.rs:10:23: 10:33}`
found `async` block `{async block@src/main.rs:24:22: 24:27}`
= note: no two async blocks, even if identical, have the same type
= help: consider pinning your async block and casting it to a trait object
এটি আশ্চর্যজনক হতে পারে। সর্বোপরি, কোনো async ব্লকই কিছু রিটার্ন করে না, তাই প্রতিটি একটি Future<Output = ()>
তৈরি করে। মনে রাখবেন যে Future
একটি ট্রেইট, এবং কম্পাইলার প্রতিটি async ব্লকের জন্য একটি অনন্য enum তৈরি করে। আপনি একটি Vec
-এ দুটি ভিন্ন হাতে লেখা struct রাখতে পারবেন না, এবং একই নিয়ম কম্পাইলার দ্বারা জেনারেট করা বিভিন্ন enum-এর ক্ষেত্রেও প্রযোজ্য।
এটি কাজ করানোর জন্য, আমাদের ট্রেইট অবজেক্ট ব্যবহার করতে হবে, ঠিক যেমনটি আমরা চ্যাপ্টার ১২-এর “Returning Errors from the run function”-এ করেছিলাম। (আমরা চ্যাপ্টার ১৮-এ ট্রেইট অবজেক্ট নিয়ে বিস্তারিত আলোচনা করব।) ট্রেইট অবজেক্ট ব্যবহার করে আমরা এই টাইপগুলো দ্বারা উৎপাদিত প্রতিটি নামহীন ফিউচারকে একই টাইপ হিসাবে বিবেচনা করতে পারি, কারণ সেগুলির সবগুলোই Future
ট্রেইট ইমপ্লিমেন্ট করে।
দ্রষ্টব্য: চ্যাপ্টার ৮-এর Using an Enum to Store Multiple Values-এ, আমরা একটি
Vec
-এ একাধিক টাইপ অন্তর্ভুক্ত করার আরেকটি উপায় নিয়ে আলোচনা করেছি: ভেক্টরে উপস্থিত হতে পারে এমন প্রতিটি টাইপকে উপস্থাপন করার জন্য একটি enum ব্যবহার করা। তবে, আমরা এখানে তা করতে পারি না। একটি কারণ হলো, আমাদের বিভিন্ন টাইপের নামকরণ করার কোনো উপায় নেই, কারণ সেগুলি নামহীন। আরেকটি কারণ হলো, আমরা একটি ভেক্টর এবংjoin_all
ব্যবহার করার মূল কারণটি ছিল ফিউচারের একটি ডাইনামিক কালেকশনের সাথে কাজ করতে পারা যেখানে আমরা কেবল তাদের একই আউটপুট টাইপ থাকা নিয়েই চিন্তা করি।
আমরা লিস্টিং 17-16-এ দেখানো হিসাবে vec!
-এর প্রতিটি ফিউচারকে একটি Box::new
-এ র্যাপ করে শুরু করি।
extern crate trpl; // required for mdbook test
use std::time::Duration;
fn main() {
trpl::run(async {
let (tx, mut rx) = trpl::channel();
let tx1 = tx.clone();
let tx1_fut = async move {
let vals = vec![
String::from("hi"),
String::from("from"),
String::from("the"),
String::from("future"),
];
for val in vals {
tx1.send(val).unwrap();
trpl::sleep(Duration::from_secs(1)).await;
}
};
let rx_fut = async {
while let Some(value) = rx.recv().await {
println!("received '{value}'");
}
};
let tx_fut = async move {
let vals = vec![
String::from("more"),
String::from("messages"),
String::from("for"),
String::from("you"),
];
for val in vals {
tx.send(val).unwrap();
trpl::sleep(Duration::from_secs(1)).await;
}
};
let futures =
vec![Box::new(tx1_fut), Box::new(rx_fut), Box::new(tx_fut)];
trpl::join_all(futures).await;
});
}
দুর্ভাগ্যবশত, এই কোডটি এখনও কম্পাইল হয় না। আসলে, আমরা দ্বিতীয় এবং তৃতীয় Box::new
কল উভয়ের জন্য আগের মতোই একই মৌলিক ত্রুটি পাই, সেইসাথে Unpin
ট্রেইট উল্লেখ করে নতুন ত্রুটিও পাই। আমরা এক মুহূর্তের মধ্যে Unpin
ত্রুটিগুলিতে ফিরে আসব। প্রথমে, আসুন futures
ভেরিয়েবলের টাইপটি স্পষ্টভাবে উল্লেখ করে Box::new
কলগুলিতে টাইপের ত্রুটিগুলি ঠিক করি (দেখুন লিস্টিং 17-17)।
extern crate trpl; // required for mdbook test
use std::time::Duration;
fn main() {
trpl::run(async {
let (tx, mut rx) = trpl::channel();
let tx1 = tx.clone();
let tx1_fut = async move {
let vals = vec![
String::from("hi"),
String::from("from"),
String::from("the"),
String::from("future"),
];
for val in vals {
tx1.send(val).unwrap();
trpl::sleep(Duration::from_secs(1)).await;
}
};
let rx_fut = async {
while let Some(value) = rx.recv().await {
println!("received '{value}'");
}
};
let tx_fut = async move {
let vals = vec![
String::from("more"),
String::from("messages"),
String::from("for"),
String::from("you"),
];
for val in vals {
tx.send(val).unwrap();
trpl::sleep(Duration::from_secs(1)).await;
}
};
let futures: Vec<Box<dyn Future<Output = ()>>> =
vec![Box::new(tx1_fut), Box::new(rx_fut), Box::new(tx_fut)];
trpl::join_all(futures).await;
});
}
এই টাইপ ডিক্লারেশনটি একটু জটিল, তাই আসুন এটি ধাপে ধাপে দেখি:
- সবচেয়ে ভেতরের টাইপটি হলো ফিউচার নিজেই। আমরা স্পষ্টভাবে উল্লেখ করি যে ফিউচারের আউটপুট হলো ইউনিট টাইপ
()
যাFuture<Output = ()>
লিখে করা হয়েছে। - তারপর আমরা ট্রেইটটিকে ডাইনামিক হিসাবে চিহ্নিত করতে
dyn
দিয়ে টীকাবদ্ধ (annotate) করি। - পুরো ট্রেইট রেফারেন্সটি একটি
Box
-এ মোড়ানো হয়। - অবশেষে, আমরা স্পষ্টভাবে বলি যে
futures
হলো একটিVec
যা এই আইটেমগুলি ধারণ করে।
এটি ইতিমধ্যেই একটি বড় পার্থক্য তৈরি করেছে। এখন যখন আমরা কম্পাইলার চালাই, আমরা কেবল Unpin
উল্লেখ করা ত্রুটিগুলি পাই। যদিও তিনটি ত্রুটি আছে, তাদের বিষয়বস্তু খুব অনুরূপ।
error[E0277]: `dyn Future<Output = ()>` cannot be unpinned
--> src/main.rs:49:24
|
49 | trpl::join_all(futures).await;
| -------------- ^^^^^^^ the trait `Unpin` is not implemented for `dyn Future<Output = ()>`
| |
| required by a bound introduced by this call
|
= note: consider using the `pin!` macro
consider using `Box::pin` if you need to access the pinned value outside of the current scope
= note: required for `Box<dyn Future<Output = ()>>` to implement `Future`
note: required by a bound in `join_all`
--> file:///home/.cargo/registry/src/index.crates.io-1949cf8c6b5b557f/futures-util-0.3.30/src/future/join_all.rs:105:14
|
102 | pub fn join_all<I>(iter: I) -> JoinAll<I::Item>
| -------- required by a bound in this function
...
105 | I::Item: Future,
| ^^^^^^ required by this bound in `join_all`
error[E0277]: `dyn Future<Output = ()>` cannot be unpinned
--> src/main.rs:49:9
|
49 | trpl::join_all(futures).await;
| ^^^^^^^^^^^^^^^^^^^^^^^ the trait `Unpin` is not implemented for `dyn Future<Output = ()>`
|
= note: consider using the `pin!` macro
consider using `Box::pin` if you need to access the pinned value outside of the current scope
= note: required for `Box<dyn Future<Output = ()>>` to implement `Future`
note: required by a bound in `futures_util::future::join_all::JoinAll`
--> file:///home/.cargo/registry/src/index.crates.io-1949cf8c6b5b557f/futures-util-0.3.30/src/future/join_all.rs:29:8
|
27 | pub struct JoinAll<F>
| ------- required by a bound in this struct
28 | where
29 | F: Future,
| ^^^^^^ required by this bound in `JoinAll`
error[E0277]: `dyn Future<Output = ()>` cannot be unpinned
--> src/main.rs:49:33
|
49 | trpl::join_all(futures).await;
| ^^^^^ the trait `Unpin` is not implemented for `dyn Future<Output = ()>`
|
= note: consider using the `pin!` macro
consider using `Box::pin` if you need to access the pinned value outside of the current scope
= note: required for `Box<dyn Future<Output = ()>>` to implement `Future`
note: required by a bound in `futures_util::future::join_all::JoinAll`
--> file:///home/.cargo/registry/src/index.crates.io-1949cf8c6b5b557f/futures-util-0.3.30/src/future/join_all.rs:29:8
|
27 | pub struct JoinAll<F>
| ------- required by a bound in this struct
28 | where
29 | F: Future,
| ^^^^^^ required by this bound in `JoinAll`
For more information about this error, try `rustc --explain E0277`.
error: could not compile `async_await` (bin "async_await") due to 3 previous errors
এটি হজম করার জন্য অনেক কিছু, তাই আসুন এটি ভেঙে দেখি। বার্তার প্রথম অংশটি আমাদের বলে যে প্রথম async ব্লকটি (src/main.rs:8:23: 20:10
) Unpin
ট্রেইট ইমপ্লিমেন্ট করে না এবং এটি সমাধান করার জন্য pin!
বা Box::pin
ব্যবহার করার পরামর্শ দেয়। অধ্যায়ের পরে, আমরা Pin
এবং Unpin
সম্পর্কে আরও কিছু বিশদ বিবরণে যাব। তবে এই মুহূর্তে, আমরা কেবল আটকে যাওয়া অবস্থা থেকে বের হতে কম্পাইলারের পরামর্শ অনুসরণ করতে পারি। লিস্টিং 17-18-এ, আমরা std::pin
থেকে Pin
ইমপোর্ট করে শুরু করি। এরপরে আমরা futures
-এর জন্য টাইপ অ্যানোটেশন আপডেট করি, প্রতিটি Box
-কে একটি Pin
দিয়ে র্যাপ করে। অবশেষে, আমরা ফিউচারগুলিকে পিন করার জন্য Box::pin
ব্যবহার করি।
extern crate trpl; // required for mdbook test use std::pin::Pin; // -- snip -- use std::time::Duration; fn main() { trpl::run(async { let (tx, mut rx) = trpl::channel(); let tx1 = tx.clone(); let tx1_fut = async move { let vals = vec![ String::from("hi"), String::from("from"), String::from("the"), String::from("future"), ]; for val in vals { tx1.send(val).unwrap(); trpl::sleep(Duration::from_secs(1)).await; } }; let rx_fut = async { while let Some(value) = rx.recv().await { println!("received '{value}'"); } }; let tx_fut = async move { let vals = vec![ String::from("more"), String::from("messages"), String::from("for"), String::from("you"), ]; for val in vals { tx.send(val).unwrap(); trpl::sleep(Duration::from_secs(1)).await; } }; let futures: Vec<Pin<Box<dyn Future<Output = ()>>>> = vec![Box::pin(tx1_fut), Box::pin(rx_fut), Box::pin(tx_fut)]; trpl::join_all(futures).await; }); }
যদি আমরা এটি কম্পাইল করে চালাই, আমরা অবশেষে সেই আউটপুট পাব যা আমরা আশা করেছিলাম:
received 'hi'
received 'more'
received 'from'
received 'messages'
received 'the'
received 'for'
received 'future'
received 'you'
যাক বাবা!
এখানে আরও কিছু অন্বেষণ করার আছে। একটি বিষয় হলো, Pin<Box<T>>
ব্যবহার করা এই ফিউচারগুলিকে Box
দিয়ে হিপে রাখার কারণে সামান্য পরিমাণ ওভারহেড যোগ করে—এবং আমরা এটি কেবল টাইপগুলি মেলানোর জন্যই করছি। আমাদের আসলে প্রয়োজন নেই হিপ অ্যালোকেশনের, সর্বোপরি: এই ফিউচারগুলি এই নির্দিষ্ট ফাংশনের জন্য স্থানীয়। যেমন আগে উল্লেখ করা হয়েছে, Pin
নিজেই একটি র্যাপার টাইপ, তাই আমরা Vec
-এ একটি একক টাইপ থাকার সুবিধা পেতে পারি—যা ছিল Box
ব্যবহার করার মূল কারণ—হিপ অ্যালোকেশন ছাড়াই। আমরা প্রতিটি ফিউচারের সাথে সরাসরি Pin
ব্যবহার করতে পারি, std::pin::pin
ম্যাক্রো ব্যবহার করে।
যাইহোক, আমাদের এখনও পিন করা রেফারেন্সের টাইপ সম্পর্কে সুস্পষ্ট হতে হবে; অন্যথায়, রাস্ট এখনও জানবে না যে এগুলিকে ডাইনামিক ট্রেইট অবজেক্ট হিসাবে ব্যাখ্যা করতে হবে, যা Vec
-এ আমাদের প্রয়োজন। তাই আমরা std::pin
থেকে আমাদের ইমপোর্টের তালিকায় pin
যোগ করি। তারপরে আমরা প্রতিটি ফিউচারকে pin!
করতে পারি যখন আমরা এটি সংজ্ঞায়িত করি এবং futures
-কে ডাইনামিক ফিউচার টাইপের পিন করা মিউটেবল রেফারেন্স ধারণকারী একটি Vec
হিসাবে সংজ্ঞায়িত করি, যেমনটি লিস্টিং 17-19-এ দেখানো হয়েছে।
extern crate trpl; // required for mdbook test use std::pin::{Pin, pin}; // -- snip -- use std::time::Duration; fn main() { trpl::run(async { let (tx, mut rx) = trpl::channel(); let tx1 = tx.clone(); let tx1_fut = pin!(async move { // --snip-- let vals = vec![ String::from("hi"), String::from("from"), String::from("the"), String::from("future"), ]; for val in vals { tx1.send(val).unwrap(); trpl::sleep(Duration::from_secs(1)).await; } }); let rx_fut = pin!(async { // --snip-- while let Some(value) = rx.recv().await { println!("received '{value}'"); } }); let tx_fut = pin!(async move { // --snip-- let vals = vec![ String::from("more"), String::from("messages"), String::from("for"), String::from("you"), ]; for val in vals { tx.send(val).unwrap(); trpl::sleep(Duration::from_secs(1)).await; } }); let futures: Vec<Pin<&mut dyn Future<Output = ()>>> = vec![tx1_fut, rx_fut, tx_fut]; trpl::join_all(futures).await; }); }
আমরা এই পর্যন্ত এসেছি এই সত্যটিকে উপেক্ষা করে যে আমাদের বিভিন্ন Output
টাইপ থাকতে পারে। উদাহরণস্বরূপ, লিস্টিং 17-20-এ, a
-এর জন্য নামহীন ফিউচারটি Future<Output = u32>
ইমপ্লিমেন্ট করে, b
-এর জন্য নামহীন ফিউচারটি Future<Output = &str>
ইমপ্লিমেন্ট করে, এবং c
-এর জন্য নামহীন ফিউচারটি Future<Output = bool>
ইমপ্লিমেন্ট করে।
extern crate trpl; // required for mdbook test fn main() { trpl::run(async { let a = async { 1u32 }; let b = async { "Hello!" }; let c = async { true }; let (a_result, b_result, c_result) = trpl::join!(a, b, c); println!("{a_result}, {b_result}, {c_result}"); }); }
আমরা তাদের await করার জন্য trpl::join!
ব্যবহার করতে পারি, কারণ এটি আমাদের একাধিক ফিউচার টাইপ পাস করার অনুমতি দেয় এবং সেই টাইপগুলির একটি টাপল তৈরি করে। আমরা trpl::join_all
ব্যবহার করতে পারি না, কারণ এটির জন্য পাস করা সমস্ত ফিউচারের একই টাইপ থাকা প্রয়োজন। মনে রাখবেন, সেই ত্রুটিটিই আমাদের Pin
-এর সাথে এই অ্যাডভেঞ্চার শুরু করিয়েছিল!
এটি একটি মৌলিক ট্রেড-অফ: আমরা হয় join_all
-এর সাথে একটি ডাইনামিক সংখ্যক ফিউচারের সাথে ডিল করতে পারি, যতক্ষণ না তাদের সবার একই টাইপ থাকে, অথবা আমরা join
ফাংশন বা join!
ম্যাক্রোর সাথে একটি নির্দিষ্ট সংখ্যক ফিউচারের সাথে ডিল করতে পারি, এমনকি তাদের বিভিন্ন টাইপ থাকলেও। এটি একই পরিস্থিতি যা আমরা রাস্টে অন্য কোনো টাইপের সাথে কাজ করার সময় সম্মুখীন হতাম। ফিউচারগুলি বিশেষ কিছু নয়, যদিও আমাদের তাদের সাথে কাজ করার জন্য কিছু চমৎকার সিনট্যাক্স আছে, এবং এটি একটি ভালো জিনিস।
ফিউচার রেসিং
যখন আমরা join
পরিবারের ফাংশন এবং ম্যাক্রোগুলোর সাথে ফিউচারগুলিকে "জয়েন" করি, তখন আমাদের এগিয়ে যাওয়ার আগে সেগুলির সবগুলো শেষ হওয়ার প্রয়োজন হয়। তবে কখনও কখনও, এগিয়ে যাওয়ার আগে আমাদের একটি সেট থেকে কেবল কিছু ফিউচার শেষ হলেই চলে— অনেকটা একটি ফিউচারকে অন্যটির বিরুদ্ধে রেস করানোর মতো।
লিস্টিং 17-21-এ, আমরা আবারও trpl::race
ব্যবহার করে দুটি ফিউচার, slow
এবং fast
-কে একে অপরের বিরুদ্ধে চালাই।
extern crate trpl; // required for mdbook test use std::time::Duration; fn main() { trpl::run(async { let slow = async { println!("'slow' started."); trpl::sleep(Duration::from_millis(100)).await; println!("'slow' finished."); }; let fast = async { println!("'fast' started."); trpl::sleep(Duration::from_millis(50)).await; println!("'fast' finished."); }; trpl::race(slow, fast).await; }); }
প্রতিটি ফিউচার যখন চলতে শুরু করে তখন একটি বার্তা প্রিন্ট করে, sleep
কল করে এবং await করে কিছু সময়ের জন্য পজ করে, এবং তারপরে যখন এটি শেষ হয় তখন আরেকটি বার্তা প্রিন্ট করে। তারপরে আমরা slow
এবং fast
উভয়কেই trpl::race
-এ পাস করি এবং তাদের মধ্যে একটি শেষ হওয়ার জন্য অপেক্ষা করি। (এখানের ফলাফল খুব আশ্চর্যজনক নয়: fast
জেতে।) যখন আমরা “Our First Async Program”-এ race
ব্যবহার করেছিলাম তার থেকে ভিন্ন, আমরা এখানে এটি রিটার্ন করা Either
ইন্সট্যান্সটিকে উপেক্ষা করি, কারণ সমস্ত আকর্ষণীয় আচরণ async ব্লকগুলির বডিতে ঘটে।
লক্ষ্য করুন যে আপনি যদি race
-এর আর্গুমেন্টের ক্রম উল্টে দেন, তবে "started" বার্তাগুলির ক্রম পরিবর্তিত হয়, যদিও fast
ফিউচারটি সবসময় প্রথমে সম্পন্ন হয়। এর কারণ হলো এই নির্দিষ্ট race
ফাংশনের ইমপ্লিমেন্টেশনটি ফেয়ার (fair) নয়। এটি সর্বদা আর্গুমেন্ট হিসাবে পাস করা ফিউচারগুলিকে যে ক্রমে পাস করা হয় সেই ক্রমে চালায়। অন্যান্য ইমপ্লিমেন্টেশনগুলি ফেয়ার এবং এলোমেলোভাবে বেছে নেবে কোন ফিউচারটি প্রথমে পোল (poll) করতে হবে। race
-এর যে ইমপ্লিমেন্টেশন আমরা ব্যবহার করছি তা ফেয়ার হোক বা না হোক, একটি ফিউচার অন্য টাস্ক শুরু করার আগে তার বডিতে প্রথম await
পর্যন্ত চলবে।
Our First Async Program থেকে স্মরণ করুন যে প্রতিটি await পয়েন্টে, রাস্ট একটি রানটাইমকে টাস্কটি পজ করার এবং অন্য একটিতে স্যুইচ করার সুযোগ দেয় যদি await করা ফিউচারটি প্রস্তুত না থাকে। এর বিপরীতটিও সত্য: রাস্ট কেবলমাত্র একটি await পয়েন্টে async ব্লকগুলি পজ করে এবং একটি রানটাইমের কাছে নিয়ন্ত্রণ ফিরিয়ে দেয়। await পয়েন্টগুলির মধ্যে সবকিছুই সিঙ্ক্রোনাস।
এর মানে হলো যদি আপনি একটি await পয়েন্ট ছাড়াই একটি async ব্লকে অনেক কাজ করেন, তবে সেই ফিউচারটি অন্য কোনো ফিউচারকে অগ্রগতি করতে বাধা দেবে। আপনি কখনও কখনও এটিকে একটি ফিউচার অন্য ফিউচারকে স্টার্ভিং (starving) হিসাবে উল্লেখ করতে শুনতে পারেন। কিছু ক্ষেত্রে, এটি একটি বড় ব্যাপার নাও হতে পারে। যাইহোক, যদি আপনি কোনো ধরনের ব্যয়বহুল সেটআপ বা দীর্ঘ সময় ধরে চলা কাজ করছেন, বা যদি আপনার একটি ফিউচার থাকে যা অনির্দিষ্টকালের জন্য কোনো নির্দিষ্ট কাজ করতে থাকবে, তবে আপনাকে কখন এবং কোথায় রানটাইমের কাছে নিয়ন্ত্রণ ফিরিয়ে দিতে হবে তা নিয়ে ভাবতে হবে।
একইভাবে, যদি আপনার দীর্ঘ সময় ধরে চলা ব্লকিং অপারেশন থাকে, তবে async প্রোগ্রামের বিভিন্ন অংশ একে অপরের সাথে কীভাবে সম্পর্কিত হবে তার উপায় সরবরাহ করার জন্য একটি দরকারী টুল হতে পারে।
কিন্তু সেই ক্ষেত্রে আপনি কীভাবে রানটাইমের কাছে নিয়ন্ত্রণ ফিরিয়ে দেবেন?
রানটাইমকে নিয়ন্ত্রণ ছেড়ে দেওয়া (Yielding Control)
আসুন একটি দীর্ঘ সময় ধরে চলা অপারেশন সিমুলেট করি। লিস্টিং 17-22 একটি slow
ফাংশন উপস্থাপন করে।
extern crate trpl; // required for mdbook test use std::{thread, time::Duration}; fn main() { trpl::run(async { // We will call `slow` here later }); } fn slow(name: &str, ms: u64) { thread::sleep(Duration::from_millis(ms)); println!("'{name}' ran for {ms}ms"); }
এই কোডটি trpl::sleep
-এর পরিবর্তে std::thread::sleep
ব্যবহার করে যাতে slow
কল করা বর্তমান থ্রেডটিকে কিছু সংখ্যক মিলিসেকেন্ডের জন্য ব্লক করে। আমরা slow
-কে বাস্তব-বিশ্বের অপারেশনগুলির বিকল্প হিসাবে ব্যবহার করতে পারি যা দীর্ঘ সময় ধরে চলে এবং ব্লকিং।
লিস্টিং 17-23-এ, আমরা একজোড়া ফিউচারে এই ধরনের CPU-বাউন্ড কাজ অনুকরণ করতে slow
ব্যবহার করি।
extern crate trpl; // required for mdbook test use std::{thread, time::Duration}; fn main() { trpl::run(async { let a = async { println!("'a' started."); slow("a", 30); slow("a", 10); slow("a", 20); trpl::sleep(Duration::from_millis(50)).await; println!("'a' finished."); }; let b = async { println!("'b' started."); slow("b", 75); slow("b", 10); slow("b", 15); slow("b", 350); trpl::sleep(Duration::from_millis(50)).await; println!("'b' finished."); }; trpl::race(a, b).await; }); } fn slow(name: &str, ms: u64) { thread::sleep(Duration::from_millis(ms)); println!("'{name}' ran for {ms}ms"); }
শুরু করার জন্য, প্রতিটি ফিউচার কেবল একগুচ্ছ ধীর অপারেশন করার পরেই রানটাইমের কাছে নিয়ন্ত্রণ ফিরিয়ে দেয়। আপনি যদি এই কোডটি চালান, আপনি এই আউটপুট দেখতে পাবেন:
'a' started.
'a' ran for 30ms
'a' ran for 10ms
'a' ran for 20ms
'b' started.
'b' ran for 75ms
'b' ran for 10ms
'b' ran for 15ms
'b' ran for 350ms
'a' finished.
আমাদের আগের উদাহরণের মতো, a
শেষ হওয়ার সাথে সাথেই race
শেষ হয়। তবে, দুটি ফিউচারের মধ্যে কোনো ইন্টারলিভিং নেই। a
ফিউচারটি trpl::sleep
কল await না হওয়া পর্যন্ত তার সমস্ত কাজ করে, তারপর b
ফিউচারটি তার নিজের trpl::sleep
কল await না হওয়া পর্যন্ত তার সমস্ত কাজ করে, এবং অবশেষে a
ফিউচারটি সম্পন্ন হয়। উভয় ফিউচারকে তাদের ধীর কাজগুলির মধ্যে অগ্রগতি করার অনুমতি দিতে, আমাদের await পয়েন্ট প্রয়োজন যাতে আমরা রানটাইমের কাছে নিয়ন্ত্রণ ফিরিয়ে দিতে পারি। এর মানে হলো আমাদের এমন কিছু দরকার যা আমরা await করতে পারি!
আমরা ইতিমধ্যে লিস্টিং 17-23-এ এই ধরনের হ্যান্ডঅফ দেখতে পাচ্ছি: যদি আমরা a
ফিউচারের শেষে trpl::sleep
সরিয়ে ফেলি, তবে এটি b
ফিউচারটি একেবারেই না চলেই সম্পন্ন হবে। আসুন অপারেশনগুলিকে অগ্রগতিতে স্যুইচ করার জন্য একটি সূচনা বিন্দু হিসাবে sleep
ফাংশনটি ব্যবহার করার চেষ্টা করি, যেমনটি লিস্টিং 17-24-এ দেখানো হয়েছে।
extern crate trpl; // required for mdbook test use std::{thread, time::Duration}; fn main() { trpl::run(async { let one_ms = Duration::from_millis(1); let a = async { println!("'a' started."); slow("a", 30); trpl::sleep(one_ms).await; slow("a", 10); trpl::sleep(one_ms).await; slow("a", 20); trpl::sleep(one_ms).await; println!("'a' finished."); }; let b = async { println!("'b' started."); slow("b", 75); trpl::sleep(one_ms).await; slow("b", 10); trpl::sleep(one_ms).await; slow("b", 15); trpl::sleep(one_ms).await; slow("b", 350); trpl::sleep(one_ms).await; println!("'b' finished."); }; trpl::race(a, b).await; }); } fn slow(name: &str, ms: u64) { thread::sleep(Duration::from_millis(ms)); println!("'{name}' ran for {ms}ms"); }
লিস্টিং 17-24-এ, আমরা slow
-এর প্রতিটি কলের মধ্যে await পয়েন্ট সহ trpl::sleep
কল যোগ করি। এখন দুটি ফিউচারের কাজ ইন্টারলিভড (interleaved) বা মিশ্রিত:
'a' started.
'a' ran for 30ms
'b' started.
'b' ran for 75ms
'a' ran for 10ms
'b' ran for 10ms
'a' ran for 20ms
'b' ran for 15ms
'a' finished.
a
ফিউচারটি b
-কে নিয়ন্ত্রণ হস্তান্তর করার আগে এখনও কিছুক্ষণ চলে, কারণ এটি trpl::sleep
কল করার আগেই slow
কল করে, কিন্তু তারপরে ফিউচারগুলি প্রতিবার যখন তাদের মধ্যে একটি await পয়েন্টে পৌঁছায় তখন অদলবদল করে। এই ক্ষেত্রে, আমরা slow
-এর প্রতিটি কলের পরে এটি করেছি, কিন্তু আমরা কাজটি আমাদের জন্য সবচেয়ে অর্থপূর্ণ উপায়ে ভাগ করতে পারতাম।
তবে আমরা এখানে সত্যিই স্লিপ করতে চাই না: আমরা যত দ্রুত সম্ভব অগ্রগতি করতে চাই। আমাদের কেবল রানটাইমের কাছে নিয়ন্ত্রণ ফিরিয়ে দিতে হবে। আমরা এটি সরাসরি করতে পারি, yield_now
ফাংশন ব্যবহার করে। লিস্টিং 17-25-এ, আমরা সেই সমস্ত sleep
কলগুলিকে yield_now
দিয়ে প্রতিস্থাপন করি।
extern crate trpl; // required for mdbook test use std::{thread, time::Duration}; fn main() { trpl::run(async { let a = async { println!("'a' started."); slow("a", 30); trpl::yield_now().await; slow("a", 10); trpl::yield_now().await; slow("a", 20); trpl::yield_now().await; println!("'a' finished."); }; let b = async { println!("'b' started."); slow("b", 75); trpl::yield_now().await; slow("b", 10); trpl::yield_now().await; slow("b", 15); trpl::yield_now().await; slow("b", 350); trpl::yield_now().await; println!("'b' finished."); }; trpl::race(a, b).await; }); } fn slow(name: &str, ms: u64) { thread::sleep(Duration::from_millis(ms)); println!("'{name}' ran for {ms}ms"); }
এই কোডটি প্রকৃত উদ্দেশ্য সম্পর্কে আরও স্পষ্ট এবং sleep
ব্যবহার করার চেয়ে উল্লেখযোগ্যভাবে দ্রুত হতে পারে, কারণ sleep
দ্বারা ব্যবহৃত টাইমারগুলির মতো টাইমারগুলির প্রায়শই তারা কতটা সূক্ষ্ম হতে পারে তার উপর সীমাবদ্ধতা থাকে। আমরা যে sleep
-এর সংস্করণটি ব্যবহার করছি, উদাহরণস্বরূপ, এটি সর্বদা কমপক্ষে এক মিলিসেকেন্ডের জন্য ঘুমাবে, এমনকি যদি আমরা এটিকে এক ন্যানোসেকেন্ডের একটি Duration
পাস করি। আবারও, আধুনিক কম্পিউটারগুলি দ্রুত: তারা এক মিলিসেকেন্ডে অনেক কিছু করতে পারে!
আপনি লিস্টিং 17-26-এর মতো একটি ছোট বেঞ্চমার্ক সেট আপ করে এটি নিজেই দেখতে পারেন। (এটি পারফরম্যান্স পরীক্ষা করার জন্য একটি বিশেষ কঠোর উপায় নয়, তবে এটি এখানে পার্থক্য দেখানোর জন্য যথেষ্ট।)
extern crate trpl; // required for mdbook test use std::time::{Duration, Instant}; fn main() { trpl::run(async { let one_ns = Duration::from_nanos(1); let start = Instant::now(); async { for _ in 1..1000 { trpl::sleep(one_ns).await; } } .await; let time = Instant::now() - start; println!( "'sleep' version finished after {} seconds.", time.as_secs_f32() ); let start = Instant::now(); async { for _ in 1..1000 { trpl::yield_now().await; } } .await; let time = Instant::now() - start; println!( "'yield' version finished after {} seconds.", time.as_secs_f32() ); }); }
এখানে, আমরা সমস্ত স্ট্যাটাস প্রিন্টিং এড়িয়ে যাই, trpl::sleep
-কে এক-ন্যানোসেকেন্ড Duration
পাস করি, এবং প্রতিটি ফিউচারকে নিজে চলতে দিই, ফিউচারগুলির মধ্যে কোনো স্যুইচিং ছাড়াই। তারপরে আমরা ১,০০০ ইটারেশন চালাই এবং দেখি trpl::sleep
ব্যবহারকারী ফিউচারটি trpl::yield_now
ব্যবহারকারী ফিউচারের তুলনায় কত সময় নেয়।
yield_now
সহ সংস্করণটি অনেক দ্রুত!
এর মানে হলো যে async এমনকি কম্পিউট-বাউন্ড টাস্কগুলির জন্যও উপযোগী হতে পারে, আপনার প্রোগ্রাম আর কী করছে তার উপর নির্ভর করে, কারণ এটি প্রোগ্রামের বিভিন্ন অংশের মধ্যে সম্পর্ক কাঠামোবদ্ধ করার জন্য একটি দরকারী টুল সরবরাহ করে। এটি কো-অপারেটিভ মাল্টিটাস্কিং (cooperative multitasking)-এর একটি রূপ, যেখানে প্রতিটি ফিউচারের await পয়েন্টের মাধ্যমে কখন নিয়ন্ত্রণ হস্তান্তর করবে তা নির্ধারণ করার ক্ষমতা রয়েছে। তাই প্রতিটি ফিউচারেরও খুব বেশিক্ষণ ব্লক করা এড়ানোর দায়িত্ব রয়েছে। কিছু রাস্ট-ভিত্তিক এমবেডেড অপারেটিং সিস্টেমে, এটিই একমাত্র ধরনের মাল্টিটাস্কিং!
বাস্তব-জগতের কোডে, আপনি সাধারণত প্রতিটি লাইনে await পয়েন্টগুলির সাথে ফাংশন কলগুলিকে অদলবদল করবেন না, অবশ্যই। যদিও এইভাবে নিয়ন্ত্রণ হস্তান্তর করা তুলনামূলকভাবে সস্তা, এটি বিনামূল্যে নয়। অনেক ক্ষেত্রে, একটি কম্পিউট-বাউন্ড টাস্ককে ভাঙার চেষ্টা করলে এটি উল্লেখযোগ্যভাবে ধীর হতে পারে, তাই কখনও কখনও সামগ্রিক পারফরম্যান্সের জন্য একটি অপারেশনকে সংক্ষিপ্তভাবে ব্লক করতে দেওয়া ভালো। আপনার কোডের প্রকৃত পারফরম্যান্সের বাধাগুলি কী তা দেখতে সর্বদা পরিমাপ করুন। তবে, যদি আপনি এমন অনেক কাজ সিরিয়ালে হতে দেখেন যা আপনি কনকারেন্টলি হওয়ার আশা করেছিলেন, তবে অন্তর্নিহিত গতিশীলতাটি মনে রাখা গুরুত্বপূর্ণ!
আমাদের নিজস্ব অ্যাসিঙ্ক্রোনাস অ্যাবস্ট্র্যাকশন তৈরি করা
আমরা নতুন প্যাটার্ন তৈরি করতে ফিউচারগুলিকে একসাথে কম্পোজও করতে পারি। উদাহরণস্বরূপ, আমরা আমাদের কাছে ইতিমধ্যে থাকা async বিল্ডিং ব্লকগুলি দিয়ে একটি timeout
ফাংশন তৈরি করতে পারি। যখন আমরা শেষ করব, ফলাফলটি আরেকটি বিল্ডিং ব্লক হবে যা আমরা আরও async অ্যাবস্ট্র্যাকশন তৈরি করতে ব্যবহার করতে পারি।
লিস্টিং 17-27 দেখায় যে আমরা এই timeout
একটি ধীর ফিউচারের সাথে কীভাবে কাজ করবে বলে আশা করব।
extern crate trpl; // required for mdbook test
use std::time::Duration;
fn main() {
trpl::run(async {
let slow = async {
trpl::sleep(Duration::from_millis(100)).await;
"I finished!"
};
match timeout(slow, Duration::from_millis(10)).await {
Ok(message) => println!("Succeeded with '{message}'"),
Err(duration) => {
println!("Failed after {} seconds", duration.as_secs())
}
}
});
}
আসুন এটি ইমপ্লিমেন্ট করি! শুরু করার জন্য, আসুন timeout
-এর API সম্পর্কে চিন্তা করি:
- এটি নিজে একটি async ফাংশন হতে হবে যাতে আমরা এটিকে await করতে পারি।
- এর প্রথম প্যারামিটারটি চালানোর জন্য একটি ফিউচার হওয়া উচিত। আমরা এটিকে যেকোনো ফিউচারের সাথে কাজ করার অনুমতি দেওয়ার জন্য জেনেরিক করতে পারি।
- এর দ্বিতীয় প্যারামিটারটি হবে অপেক্ষা করার সর্বোচ্চ সময়। যদি আমরা একটি
Duration
ব্যবহার করি, তবে এটিtrpl::sleep
-এ পাস করা সহজ হবে। - এটি একটি
Result
রিটার্ন করা উচিত। যদি ফিউচারটি সফলভাবে সম্পন্ন হয়,Result
টিOk
হবে ফিউচার দ্বারা উৎপাদিত মান সহ। যদি টাইমআউটটি আগে শেষ হয়ে যায়,Result
টিErr
হবে টাইমআউটটি যে সময় অপেক্ষা করেছে সেই সময়কাল সহ।
লিস্টিং 17-28 এই ডিক্লারেশনটি দেখায়।
extern crate trpl; // required for mdbook test
use std::time::Duration;
fn main() {
trpl::run(async {
let slow = async {
trpl::sleep(Duration::from_secs(5)).await;
"Finally finished"
};
match timeout(slow, Duration::from_secs(2)).await {
Ok(message) => println!("Succeeded with '{message}'"),
Err(duration) => {
println!("Failed after {} seconds", duration.as_secs())
}
}
});
}
async fn timeout<F: Future>(
future_to_try: F,
max_time: Duration,
) -> Result<F::Output, Duration> {
// Here is where our implementation will go!
}
এটি টাইপের জন্য আমাদের লক্ষ্য পূরণ করে। এখন আসুন আমাদের প্রয়োজনীয় আচরণ সম্পর্কে চিন্তা করি: আমরা পাস করা ফিউচারটিকে সময়কালের বিরুদ্ধে রেস করতে চাই। আমরা সময়কাল থেকে একটি টাইমার ফিউচার তৈরি করতে trpl::sleep
ব্যবহার করতে পারি, এবং সেই টাইমারটিকে কলারের পাস করা ফিউচারের সাথে চালানোর জন্য trpl::race
ব্যবহার করতে পারি।
আমরা আরও জানি যে race
ফেয়ার নয়, আর্গুমেন্টগুলিকে যে ক্রমে পাস করা হয় সেই ক্রমে পোল করে। সুতরাং, আমরা future_to_try
-কে প্রথমে race
-এ পাস করি যাতে max_time
খুব কম সময় হলেও এটি সম্পন্ন হওয়ার সুযোগ পায়। যদি future_to_try
প্রথমে শেষ হয়, race
future_to_try
-এর আউটপুট সহ Left
রিটার্ন করবে। যদি timer
প্রথমে শেষ হয়, race
টাইমারের আউটপুট ()
সহ Right
রিটার্ন করবে।
লিস্টিং 17-29-এ, আমরা trpl::race
await করার ফলাফলের উপর ম্যাচ করি।
extern crate trpl; // required for mdbook test use std::time::Duration; use trpl::Either; // --snip-- fn main() { trpl::run(async { let slow = async { trpl::sleep(Duration::from_secs(5)).await; "Finally finished" }; match timeout(slow, Duration::from_secs(2)).await { Ok(message) => println!("Succeeded with '{message}'"), Err(duration) => { println!("Failed after {} seconds", duration.as_secs()) } } }); } async fn timeout<F: Future>( future_to_try: F, max_time: Duration, ) -> Result<F::Output, Duration> { match trpl::race(future_to_try, trpl::sleep(max_time)).await { Either::Left(output) => Ok(output), Either::Right(_) => Err(max_time), } }
যদি future_to_try
সফল হয় এবং আমরা একটি Left(output)
পাই, আমরা Ok(output)
রিটার্ন করি। যদি স্লিপ টাইমারটি পরিবর্তে শেষ হয়ে যায় এবং আমরা একটি Right(())
পাই, আমরা _
দিয়ে ()
উপেক্ষা করি এবং পরিবর্তে Err(max_time)
রিটার্ন করি।
এর সাথে, আমাদের কাছে দুটি অন্য async হেল্পার দিয়ে তৈরি একটি কার্যকরী timeout
আছে। যদি আমরা আমাদের কোড চালাই, এটি টাইমআউটের পরে ব্যর্থতার মোড প্রিন্ট করবে:
Failed after 2 seconds
যেহেতু ফিউচারগুলি অন্য ফিউচারের সাথে কম্পোজ করে, আপনি ছোট async বিল্ডিং ব্লক ব্যবহার করে সত্যিই শক্তিশালী টুল তৈরি করতে পারেন। উদাহরণস্বরূপ, আপনি রিট্রাইয়ের সাথে টাইমআউট একত্রিত করতে এই একই পদ্ধতি ব্যবহার করতে পারেন, এবং পরিবর্তে সেগুলি নেটওয়ার্ক কলের মতো অপারেশনের সাথে ব্যবহার করতে পারেন (অধ্যায়ের শুরুর উদাহরণগুলির মধ্যে একটি)।
বাস্তবে, আপনি সাধারণত সরাসরি async
এবং await
-এর সাথে কাজ করবেন, এবং দ্বিতীয়ত join
, join_all
, race
, এবং আরও অনেক ফাংশন এবং ম্যাক্রোর সাথে কাজ করবেন। আপনাকে শুধুমাত্র সেই API-গুলির সাথে ফিউচার ব্যবহার করার জন্য মাঝে মাঝে pin
-এর প্রয়োজন হবে।
আমরা এখন একই সময়ে একাধিক ফিউচারের সাথে কাজ করার বেশ কয়েকটি উপায় দেখেছি। এরপরে, আমরা দেখব কীভাবে আমরা সময়ের সাথে সাথে স্ট্রিম (streams) দিয়ে একটি ক্রমানুসারে একাধিক ফিউচারের সাথে কাজ করতে পারি। তবে, প্রথমে আপনার বিবেচনা করার মতো আরও কয়েকটি বিষয় এখানে রয়েছে:
-
কোনো গ্রুপের সমস্ত ফিউচার শেষ হওয়ার জন্য অপেক্ষা করতে আমরা
join_all
-এর সাথে একটিVec
ব্যবহার করেছি। আপনি কীভাবে একটিVec
ব্যবহার করে ফিউচারের একটি গ্রুপকে ক্রমানুসারে প্রসেস করতে পারেন? এটি করার ট্রেড-অফগুলি কী কী? -
futures
ক্রেট থেকেfutures::stream::FuturesUnordered
টাইপটি দেখুন। এটি ব্যবহার করা একটিVec
ব্যবহার করার থেকে কীভাবে ভিন্ন হবে? (ক্রেটেরstream
অংশ থেকে এটি এসেছে এই সত্যটি নিয়ে চিন্তা করবেন না; এটি যেকোনো ফিউচারের কালেকশনের সাথে ঠিকঠাক কাজ করে।)