Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

যেকোনো সংখ্যক ফিউচারের সাথে কাজ করা

পূর্ববর্তী বিভাগে যখন আমরা দুটি ফিউচার থেকে তিনটি ফিউচারে স্যুইচ করেছি, তখন আমাদের join থেকে join3 ব্যবহার করতে হয়েছিল। আমরা যতবার ফিউচারের সংখ্যা পরিবর্তন করব, ততবার একটি ভিন্ন ফাংশন কল করতে হলে তা বিরক্তিকর হতো। আনন্দের বিষয়, আমাদের কাছে join-এর একটি ম্যাক্রো ফর্ম রয়েছে যেখানে আমরা ইচ্ছামত আর্গুমেন্ট পাস করতে পারি। এটি ফিউচারগুলোকে await করার কাজও নিজেই করে। সুতরাং, আমরা লিস্টিং 17-13 থেকে কোডটি join3-এর পরিবর্তে join! ব্যবহার করে পুনরায় লিখতে পারি, যেমনটি লিস্টিং 17-14-এ দেখানো হয়েছে।

extern crate trpl; // required for mdbook test

use std::time::Duration;

fn main() {
    trpl::run(async {
        let (tx, mut rx) = trpl::channel();

        let tx1 = tx.clone();
        let tx1_fut = async move {
            let vals = vec![
                String::from("hi"),
                String::from("from"),
                String::from("the"),
                String::from("future"),
            ];

            for val in vals {
                tx1.send(val).unwrap();
                trpl::sleep(Duration::from_secs(1)).await;
            }
        };

        let rx_fut = async {
            while let Some(value) = rx.recv().await {
                println!("received '{value}'");
            }
        };

        let tx_fut = async move {
            let vals = vec![
                String::from("more"),
                String::from("messages"),
                String::from("for"),
                String::from("you"),
            ];

            for val in vals {
                tx.send(val).unwrap();
                trpl::sleep(Duration::from_secs(1)).await;
            }
        };

        trpl::join!(tx1_fut, tx_fut, rx_fut);
    });
}

এটি অবশ্যই join এবং join3 এবং join4 ইত্যাদির মধ্যে অদলবদল করার চেয়ে একটি উন্নতি! যাইহোক, এমনকি এই ম্যাক্রো ফর্মটি কেবল তখনই কাজ করে যখন আমরা আগে থেকে ফিউচারের সংখ্যা জানি। কিন্তু বাস্তব-জগতের রাস্ট কোডে, ফিউচারগুলোকে একটি কালেকশনে পুশ করা এবং তারপরে তাদের কিছু বা সমস্ত ফিউচার সম্পূর্ণ হওয়ার জন্য অপেক্ষা করা একটি সাধারণ প্যাটার্ন।

কোনো কালেকশনের সমস্ত ফিউচার পরীক্ষা করার জন্য, আমাদের সেগুলোর সবগুলোর উপর ইটারেট করতে হবে এবং জয়েন করতে হবে। trpl::join_all ফাংশনটি যেকোনো টাইপ গ্রহণ করে যা Iterator ট্রেইট ইমপ্লিমেন্ট করে, যা আপনি চ্যাপ্টার ১৩-এর The Iterator Trait and the next Method-এ শিখেছেন, তাই এটি ঠিক কাজের জিনিস বলে মনে হচ্ছে। আসুন আমাদের ফিউচারগুলোকে একটি ভেক্টরে রাখি এবং join!-কে join_all দিয়ে প্রতিস্থাপন করার চেষ্টা করি যেমনটি লিস্টিং 17-15-এ দেখানো হয়েছে।

extern crate trpl; // required for mdbook test

use std::time::Duration;

fn main() {
    trpl::run(async {
        let (tx, mut rx) = trpl::channel();

        let tx1 = tx.clone();
        let tx1_fut = async move {
            let vals = vec![
                String::from("hi"),
                String::from("from"),
                String::from("the"),
                String::from("future"),
            ];

            for val in vals {
                tx1.send(val).unwrap();
                trpl::sleep(Duration::from_secs(1)).await;
            }
        };

        let rx_fut = async {
            while let Some(value) = rx.recv().await {
                println!("received '{value}'");
            }
        };

        let tx_fut = async move {
            let vals = vec![
                String::from("more"),
                String::from("messages"),
                String::from("for"),
                String::from("you"),
            ];

            for val in vals {
                tx.send(val).unwrap();
                trpl::sleep(Duration::from_secs(1)).await;
            }
        };

        let futures = vec![tx1_fut, rx_fut, tx_fut];

        trpl::join_all(futures).await;
    });
}

দুর্ভাগ্যবশত, এই কোডটি কম্পাইল হয় না। পরিবর্তে, আমরা এই ত্রুটিটি পাই:

error[E0308]: mismatched types
  --> src/main.rs:45:37
   |
10 |         let tx1_fut = async move {
   |                       ---------- the expected `async` block
...
24 |         let rx_fut = async {
   |                      ----- the found `async` block
...
45 |         let futures = vec![tx1_fut, rx_fut, tx_fut];
   |                                     ^^^^^^ expected `async` block, found a different `async` block
   |
   = note: expected `async` block `{async block@src/main.rs:10:23: 10:33}`
              found `async` block `{async block@src/main.rs:24:22: 24:27}`
   = note: no two async blocks, even if identical, have the same type
   = help: consider pinning your async block and casting it to a trait object

এটি আশ্চর্যজনক হতে পারে। সর্বোপরি, কোনো async ব্লকই কিছু রিটার্ন করে না, তাই প্রতিটি একটি Future<Output = ()> তৈরি করে। মনে রাখবেন যে Future একটি ট্রেইট, এবং কম্পাইলার প্রতিটি async ব্লকের জন্য একটি অনন্য enum তৈরি করে। আপনি একটি Vec-এ দুটি ভিন্ন হাতে লেখা struct রাখতে পারবেন না, এবং একই নিয়ম কম্পাইলার দ্বারা জেনারেট করা বিভিন্ন enum-এর ক্ষেত্রেও প্রযোজ্য।

এটি কাজ করানোর জন্য, আমাদের ট্রেইট অবজেক্ট ব্যবহার করতে হবে, ঠিক যেমনটি আমরা চ্যাপ্টার ১২-এর “Returning Errors from the run function”-এ করেছিলাম। (আমরা চ্যাপ্টার ১৮-এ ট্রেইট অবজেক্ট নিয়ে বিস্তারিত আলোচনা করব।) ট্রেইট অবজেক্ট ব্যবহার করে আমরা এই টাইপগুলো দ্বারা উৎপাদিত প্রতিটি নামহীন ফিউচারকে একই টাইপ হিসাবে বিবেচনা করতে পারি, কারণ সেগুলির সবগুলোই Future ট্রেইট ইমপ্লিমেন্ট করে।

দ্রষ্টব্য: চ্যাপ্টার ৮-এর Using an Enum to Store Multiple Values-এ, আমরা একটি Vec-এ একাধিক টাইপ অন্তর্ভুক্ত করার আরেকটি উপায় নিয়ে আলোচনা করেছি: ভেক্টরে উপস্থিত হতে পারে এমন প্রতিটি টাইপকে উপস্থাপন করার জন্য একটি enum ব্যবহার করা। তবে, আমরা এখানে তা করতে পারি না। একটি কারণ হলো, আমাদের বিভিন্ন টাইপের নামকরণ করার কোনো উপায় নেই, কারণ সেগুলি নামহীন। আরেকটি কারণ হলো, আমরা একটি ভেক্টর এবং join_all ব্যবহার করার মূল কারণটি ছিল ফিউচারের একটি ডাইনামিক কালেকশনের সাথে কাজ করতে পারা যেখানে আমরা কেবল তাদের একই আউটপুট টাইপ থাকা নিয়েই চিন্তা করি।

আমরা লিস্টিং 17-16-এ দেখানো হিসাবে vec!-এর প্রতিটি ফিউচারকে একটি Box::new-এ র‍্যাপ করে শুরু করি।

extern crate trpl; // required for mdbook test

use std::time::Duration;

fn main() {
    trpl::run(async {
        let (tx, mut rx) = trpl::channel();

        let tx1 = tx.clone();
        let tx1_fut = async move {
            let vals = vec![
                String::from("hi"),
                String::from("from"),
                String::from("the"),
                String::from("future"),
            ];

            for val in vals {
                tx1.send(val).unwrap();
                trpl::sleep(Duration::from_secs(1)).await;
            }
        };

        let rx_fut = async {
            while let Some(value) = rx.recv().await {
                println!("received '{value}'");
            }
        };

        let tx_fut = async move {
            let vals = vec![
                String::from("more"),
                String::from("messages"),
                String::from("for"),
                String::from("you"),
            ];

            for val in vals {
                tx.send(val).unwrap();
                trpl::sleep(Duration::from_secs(1)).await;
            }
        };

        let futures =
            vec![Box::new(tx1_fut), Box::new(rx_fut), Box::new(tx_fut)];

        trpl::join_all(futures).await;
    });
}

দুর্ভাগ্যবশত, এই কোডটি এখনও কম্পাইল হয় না। আসলে, আমরা দ্বিতীয় এবং তৃতীয় Box::new কল উভয়ের জন্য আগের মতোই একই মৌলিক ত্রুটি পাই, সেইসাথে Unpin ট্রেইট উল্লেখ করে নতুন ত্রুটিও পাই। আমরা এক মুহূর্তের মধ্যে Unpin ত্রুটিগুলিতে ফিরে আসব। প্রথমে, আসুন futures ভেরিয়েবলের টাইপটি স্পষ্টভাবে উল্লেখ করে Box::new কলগুলিতে টাইপের ত্রুটিগুলি ঠিক করি (দেখুন লিস্টিং 17-17)।

extern crate trpl; // required for mdbook test

use std::time::Duration;

fn main() {
    trpl::run(async {
        let (tx, mut rx) = trpl::channel();

        let tx1 = tx.clone();
        let tx1_fut = async move {
            let vals = vec![
                String::from("hi"),
                String::from("from"),
                String::from("the"),
                String::from("future"),
            ];

            for val in vals {
                tx1.send(val).unwrap();
                trpl::sleep(Duration::from_secs(1)).await;
            }
        };

        let rx_fut = async {
            while let Some(value) = rx.recv().await {
                println!("received '{value}'");
            }
        };

        let tx_fut = async move {
            let vals = vec![
                String::from("more"),
                String::from("messages"),
                String::from("for"),
                String::from("you"),
            ];

            for val in vals {
                tx.send(val).unwrap();
                trpl::sleep(Duration::from_secs(1)).await;
            }
        };

        let futures: Vec<Box<dyn Future<Output = ()>>> =
            vec![Box::new(tx1_fut), Box::new(rx_fut), Box::new(tx_fut)];

        trpl::join_all(futures).await;
    });
}

এই টাইপ ডিক্লারেশনটি একটু জটিল, তাই আসুন এটি ধাপে ধাপে দেখি:

  1. সবচেয়ে ভেতরের টাইপটি হলো ফিউচার নিজেই। আমরা স্পষ্টভাবে উল্লেখ করি যে ফিউচারের আউটপুট হলো ইউনিট টাইপ () যা Future<Output = ()> লিখে করা হয়েছে।
  2. তারপর আমরা ট্রেইটটিকে ডাইনামিক হিসাবে চিহ্নিত করতে dyn দিয়ে টীকাবদ্ধ (annotate) করি।
  3. পুরো ট্রেইট রেফারেন্সটি একটি Box-এ মোড়ানো হয়।
  4. অবশেষে, আমরা স্পষ্টভাবে বলি যে futures হলো একটি Vec যা এই আইটেমগুলি ধারণ করে।

এটি ইতিমধ্যেই একটি বড় পার্থক্য তৈরি করেছে। এখন যখন আমরা কম্পাইলার চালাই, আমরা কেবল Unpin উল্লেখ করা ত্রুটিগুলি পাই। যদিও তিনটি ত্রুটি আছে, তাদের বিষয়বস্তু খুব অনুরূপ।

error[E0277]: `dyn Future<Output = ()>` cannot be unpinned
   --> src/main.rs:49:24
    |
49  |         trpl::join_all(futures).await;
    |         -------------- ^^^^^^^ the trait `Unpin` is not implemented for `dyn Future<Output = ()>`
    |         |
    |         required by a bound introduced by this call
    |
    = note: consider using the `pin!` macro
            consider using `Box::pin` if you need to access the pinned value outside of the current scope
    = note: required for `Box<dyn Future<Output = ()>>` to implement `Future`
note: required by a bound in `join_all`
   --> file:///home/.cargo/registry/src/index.crates.io-1949cf8c6b5b557f/futures-util-0.3.30/src/future/join_all.rs:105:14
    |
102 | pub fn join_all<I>(iter: I) -> JoinAll<I::Item>
    |        -------- required by a bound in this function
...
105 |     I::Item: Future,
    |              ^^^^^^ required by this bound in `join_all`

error[E0277]: `dyn Future<Output = ()>` cannot be unpinned
  --> src/main.rs:49:9
   |
49 |         trpl::join_all(futures).await;
   |         ^^^^^^^^^^^^^^^^^^^^^^^ the trait `Unpin` is not implemented for `dyn Future<Output = ()>`
   |
   = note: consider using the `pin!` macro
           consider using `Box::pin` if you need to access the pinned value outside of the current scope
   = note: required for `Box<dyn Future<Output = ()>>` to implement `Future`
note: required by a bound in `futures_util::future::join_all::JoinAll`
  --> file:///home/.cargo/registry/src/index.crates.io-1949cf8c6b5b557f/futures-util-0.3.30/src/future/join_all.rs:29:8
   |
27 | pub struct JoinAll<F>
   |            ------- required by a bound in this struct
28 | where
29 |     F: Future,
   |        ^^^^^^ required by this bound in `JoinAll`

error[E0277]: `dyn Future<Output = ()>` cannot be unpinned
  --> src/main.rs:49:33
   |
49 |         trpl::join_all(futures).await;
   |                                 ^^^^^ the trait `Unpin` is not implemented for `dyn Future<Output = ()>`
   |
   = note: consider using the `pin!` macro
           consider using `Box::pin` if you need to access the pinned value outside of the current scope
   = note: required for `Box<dyn Future<Output = ()>>` to implement `Future`
note: required by a bound in `futures_util::future::join_all::JoinAll`
  --> file:///home/.cargo/registry/src/index.crates.io-1949cf8c6b5b557f/futures-util-0.3.30/src/future/join_all.rs:29:8
   |
27 | pub struct JoinAll<F>
   |            ------- required by a bound in this struct
28 | where
29 |     F: Future,
   |        ^^^^^^ required by this bound in `JoinAll`

For more information about this error, try `rustc --explain E0277`.
error: could not compile `async_await` (bin "async_await") due to 3 previous errors

এটি হজম করার জন্য অনেক কিছু, তাই আসুন এটি ভেঙে দেখি। বার্তার প্রথম অংশটি আমাদের বলে যে প্রথম async ব্লকটি (src/main.rs:8:23: 20:10) Unpin ট্রেইট ইমপ্লিমেন্ট করে না এবং এটি সমাধান করার জন্য pin! বা Box::pin ব্যবহার করার পরামর্শ দেয়। অধ্যায়ের পরে, আমরা Pin এবং Unpin সম্পর্কে আরও কিছু বিশদ বিবরণে যাব। তবে এই মুহূর্তে, আমরা কেবল আটকে যাওয়া অবস্থা থেকে বের হতে কম্পাইলারের পরামর্শ অনুসরণ করতে পারি। লিস্টিং 17-18-এ, আমরা std::pin থেকে Pin ইমপোর্ট করে শুরু করি। এরপরে আমরা futures-এর জন্য টাইপ অ্যানোটেশন আপডেট করি, প্রতিটি Box-কে একটি Pin দিয়ে র‍্যাপ করে। অবশেষে, আমরা ফিউচারগুলিকে পিন করার জন্য Box::pin ব্যবহার করি।

extern crate trpl; // required for mdbook test

use std::pin::Pin;

// -- snip --

use std::time::Duration;

fn main() {
    trpl::run(async {
        let (tx, mut rx) = trpl::channel();

        let tx1 = tx.clone();
        let tx1_fut = async move {
            let vals = vec![
                String::from("hi"),
                String::from("from"),
                String::from("the"),
                String::from("future"),
            ];

            for val in vals {
                tx1.send(val).unwrap();
                trpl::sleep(Duration::from_secs(1)).await;
            }
        };

        let rx_fut = async {
            while let Some(value) = rx.recv().await {
                println!("received '{value}'");
            }
        };

        let tx_fut = async move {
            let vals = vec![
                String::from("more"),
                String::from("messages"),
                String::from("for"),
                String::from("you"),
            ];

            for val in vals {
                tx.send(val).unwrap();
                trpl::sleep(Duration::from_secs(1)).await;
            }
        };

        let futures: Vec<Pin<Box<dyn Future<Output = ()>>>> =
            vec![Box::pin(tx1_fut), Box::pin(rx_fut), Box::pin(tx_fut)];

        trpl::join_all(futures).await;
    });
}

যদি আমরা এটি কম্পাইল করে চালাই, আমরা অবশেষে সেই আউটপুট পাব যা আমরা আশা করেছিলাম:

received 'hi'
received 'more'
received 'from'
received 'messages'
received 'the'
received 'for'
received 'future'
received 'you'

যাক বাবা!

এখানে আরও কিছু অন্বেষণ করার আছে। একটি বিষয় হলো, Pin<Box<T>> ব্যবহার করা এই ফিউচারগুলিকে Box দিয়ে হিপে রাখার কারণে সামান্য পরিমাণ ওভারহেড যোগ করে—এবং আমরা এটি কেবল টাইপগুলি মেলানোর জন্যই করছি। আমাদের আসলে প্রয়োজন নেই হিপ অ্যালোকেশনের, সর্বোপরি: এই ফিউচারগুলি এই নির্দিষ্ট ফাংশনের জন্য স্থানীয়। যেমন আগে উল্লেখ করা হয়েছে, Pin নিজেই একটি র‍্যাপার টাইপ, তাই আমরা Vec-এ একটি একক টাইপ থাকার সুবিধা পেতে পারি—যা ছিল Box ব্যবহার করার মূল কারণ—হিপ অ্যালোকেশন ছাড়াই। আমরা প্রতিটি ফিউচারের সাথে সরাসরি Pin ব্যবহার করতে পারি, std::pin::pin ম্যাক্রো ব্যবহার করে।

যাইহোক, আমাদের এখনও পিন করা রেফারেন্সের টাইপ সম্পর্কে সুস্পষ্ট হতে হবে; অন্যথায়, রাস্ট এখনও জানবে না যে এগুলিকে ডাইনামিক ট্রেইট অবজেক্ট হিসাবে ব্যাখ্যা করতে হবে, যা Vec-এ আমাদের প্রয়োজন। তাই আমরা std::pin থেকে আমাদের ইমপোর্টের তালিকায় pin যোগ করি। তারপরে আমরা প্রতিটি ফিউচারকে pin! করতে পারি যখন আমরা এটি সংজ্ঞায়িত করি এবং futures-কে ডাইনামিক ফিউচার টাইপের পিন করা মিউটেবল রেফারেন্স ধারণকারী একটি Vec হিসাবে সংজ্ঞায়িত করি, যেমনটি লিস্টিং 17-19-এ দেখানো হয়েছে।

extern crate trpl; // required for mdbook test

use std::pin::{Pin, pin};

// -- snip --

use std::time::Duration;

fn main() {
    trpl::run(async {
        let (tx, mut rx) = trpl::channel();

        let tx1 = tx.clone();
        let tx1_fut = pin!(async move {
            // --snip--
            let vals = vec![
                String::from("hi"),
                String::from("from"),
                String::from("the"),
                String::from("future"),
            ];

            for val in vals {
                tx1.send(val).unwrap();
                trpl::sleep(Duration::from_secs(1)).await;
            }
        });

        let rx_fut = pin!(async {
            // --snip--
            while let Some(value) = rx.recv().await {
                println!("received '{value}'");
            }
        });

        let tx_fut = pin!(async move {
            // --snip--
            let vals = vec![
                String::from("more"),
                String::from("messages"),
                String::from("for"),
                String::from("you"),
            ];

            for val in vals {
                tx.send(val).unwrap();
                trpl::sleep(Duration::from_secs(1)).await;
            }
        });

        let futures: Vec<Pin<&mut dyn Future<Output = ()>>> =
            vec![tx1_fut, rx_fut, tx_fut];

        trpl::join_all(futures).await;
    });
}

আমরা এই পর্যন্ত এসেছি এই সত্যটিকে উপেক্ষা করে যে আমাদের বিভিন্ন Output টাইপ থাকতে পারে। উদাহরণস্বরূপ, লিস্টিং 17-20-এ, a-এর জন্য নামহীন ফিউচারটি Future<Output = u32> ইমপ্লিমেন্ট করে, b-এর জন্য নামহীন ফিউচারটি Future<Output = &str> ইমপ্লিমেন্ট করে, এবং c-এর জন্য নামহীন ফিউচারটি Future<Output = bool> ইমপ্লিমেন্ট করে।

extern crate trpl; // required for mdbook test

fn main() {
    trpl::run(async {
        let a = async { 1u32 };
        let b = async { "Hello!" };
        let c = async { true };

        let (a_result, b_result, c_result) = trpl::join!(a, b, c);
        println!("{a_result}, {b_result}, {c_result}");
    });
}

আমরা তাদের await করার জন্য trpl::join! ব্যবহার করতে পারি, কারণ এটি আমাদের একাধিক ফিউচার টাইপ পাস করার অনুমতি দেয় এবং সেই টাইপগুলির একটি টাপল তৈরি করে। আমরা trpl::join_all ব্যবহার করতে পারি না, কারণ এটির জন্য পাস করা সমস্ত ফিউচারের একই টাইপ থাকা প্রয়োজন। মনে রাখবেন, সেই ত্রুটিটিই আমাদের Pin-এর সাথে এই অ্যাডভেঞ্চার শুরু করিয়েছিল!

এটি একটি মৌলিক ট্রেড-অফ: আমরা হয় join_all-এর সাথে একটি ডাইনামিক সংখ্যক ফিউচারের সাথে ডিল করতে পারি, যতক্ষণ না তাদের সবার একই টাইপ থাকে, অথবা আমরা join ফাংশন বা join! ম্যাক্রোর সাথে একটি নির্দিষ্ট সংখ্যক ফিউচারের সাথে ডিল করতে পারি, এমনকি তাদের বিভিন্ন টাইপ থাকলেও। এটি একই পরিস্থিতি যা আমরা রাস্টে অন্য কোনো টাইপের সাথে কাজ করার সময় সম্মুখীন হতাম। ফিউচারগুলি বিশেষ কিছু নয়, যদিও আমাদের তাদের সাথে কাজ করার জন্য কিছু চমৎকার সিনট্যাক্স আছে, এবং এটি একটি ভালো জিনিস।

ফিউচার রেসিং

যখন আমরা join পরিবারের ফাংশন এবং ম্যাক্রোগুলোর সাথে ফিউচারগুলিকে "জয়েন" করি, তখন আমাদের এগিয়ে যাওয়ার আগে সেগুলির সবগুলো শেষ হওয়ার প্রয়োজন হয়। তবে কখনও কখনও, এগিয়ে যাওয়ার আগে আমাদের একটি সেট থেকে কেবল কিছু ফিউচার শেষ হলেই চলে— অনেকটা একটি ফিউচারকে অন্যটির বিরুদ্ধে রেস করানোর মতো।

লিস্টিং 17-21-এ, আমরা আবারও trpl::race ব্যবহার করে দুটি ফিউচার, slow এবং fast-কে একে অপরের বিরুদ্ধে চালাই।

extern crate trpl; // required for mdbook test

use std::time::Duration;

fn main() {
    trpl::run(async {
        let slow = async {
            println!("'slow' started.");
            trpl::sleep(Duration::from_millis(100)).await;
            println!("'slow' finished.");
        };

        let fast = async {
            println!("'fast' started.");
            trpl::sleep(Duration::from_millis(50)).await;
            println!("'fast' finished.");
        };

        trpl::race(slow, fast).await;
    });
}

প্রতিটি ফিউচার যখন চলতে শুরু করে তখন একটি বার্তা প্রিন্ট করে, sleep কল করে এবং await করে কিছু সময়ের জন্য পজ করে, এবং তারপরে যখন এটি শেষ হয় তখন আরেকটি বার্তা প্রিন্ট করে। তারপরে আমরা slow এবং fast উভয়কেই trpl::race-এ পাস করি এবং তাদের মধ্যে একটি শেষ হওয়ার জন্য অপেক্ষা করি। (এখানের ফলাফল খুব আশ্চর্যজনক নয়: fast জেতে।) যখন আমরা “Our First Async Program”-এ race ব্যবহার করেছিলাম তার থেকে ভিন্ন, আমরা এখানে এটি রিটার্ন করা Either ইন্সট্যান্সটিকে উপেক্ষা করি, কারণ সমস্ত আকর্ষণীয় আচরণ async ব্লকগুলির বডিতে ঘটে।

লক্ষ্য করুন যে আপনি যদি race-এর আর্গুমেন্টের ক্রম উল্টে দেন, তবে "started" বার্তাগুলির ক্রম পরিবর্তিত হয়, যদিও fast ফিউচারটি সবসময় প্রথমে সম্পন্ন হয়। এর কারণ হলো এই নির্দিষ্ট race ফাংশনের ইমপ্লিমেন্টেশনটি ফেয়ার (fair) নয়। এটি সর্বদা আর্গুমেন্ট হিসাবে পাস করা ফিউচারগুলিকে যে ক্রমে পাস করা হয় সেই ক্রমে চালায়। অন্যান্য ইমপ্লিমেন্টেশনগুলি ফেয়ার এবং এলোমেলোভাবে বেছে নেবে কোন ফিউচারটি প্রথমে পোল (poll) করতে হবে। race-এর যে ইমপ্লিমেন্টেশন আমরা ব্যবহার করছি তা ফেয়ার হোক বা না হোক, একটি ফিউচার অন্য টাস্ক শুরু করার আগে তার বডিতে প্রথম await পর্যন্ত চলবে।

Our First Async Program থেকে স্মরণ করুন যে প্রতিটি await পয়েন্টে, রাস্ট একটি রানটাইমকে টাস্কটি পজ করার এবং অন্য একটিতে স্যুইচ করার সুযোগ দেয় যদি await করা ফিউচারটি প্রস্তুত না থাকে। এর বিপরীতটিও সত্য: রাস্ট কেবলমাত্র একটি await পয়েন্টে async ব্লকগুলি পজ করে এবং একটি রানটাইমের কাছে নিয়ন্ত্রণ ফিরিয়ে দেয়। await পয়েন্টগুলির মধ্যে সবকিছুই সিঙ্ক্রোনাস।

এর মানে হলো যদি আপনি একটি await পয়েন্ট ছাড়াই একটি async ব্লকে অনেক কাজ করেন, তবে সেই ফিউচারটি অন্য কোনো ফিউচারকে অগ্রগতি করতে বাধা দেবে। আপনি কখনও কখনও এটিকে একটি ফিউচার অন্য ফিউচারকে স্টার্ভিং (starving) হিসাবে উল্লেখ করতে শুনতে পারেন। কিছু ক্ষেত্রে, এটি একটি বড় ব্যাপার নাও হতে পারে। যাইহোক, যদি আপনি কোনো ধরনের ব্যয়বহুল সেটআপ বা দীর্ঘ সময় ধরে চলা কাজ করছেন, বা যদি আপনার একটি ফিউচার থাকে যা অনির্দিষ্টকালের জন্য কোনো নির্দিষ্ট কাজ করতে থাকবে, তবে আপনাকে কখন এবং কোথায় রানটাইমের কাছে নিয়ন্ত্রণ ফিরিয়ে দিতে হবে তা নিয়ে ভাবতে হবে।

একইভাবে, যদি আপনার দীর্ঘ সময় ধরে চলা ব্লকিং অপারেশন থাকে, তবে async প্রোগ্রামের বিভিন্ন অংশ একে অপরের সাথে কীভাবে সম্পর্কিত হবে তার উপায় সরবরাহ করার জন্য একটি দরকারী টুল হতে পারে।

কিন্তু সেই ক্ষেত্রে আপনি কীভাবে রানটাইমের কাছে নিয়ন্ত্রণ ফিরিয়ে দেবেন?

রানটাইমকে নিয়ন্ত্রণ ছেড়ে দেওয়া (Yielding Control)

আসুন একটি দীর্ঘ সময় ধরে চলা অপারেশন সিমুলেট করি। লিস্টিং 17-22 একটি slow ফাংশন উপস্থাপন করে।

extern crate trpl; // required for mdbook test

use std::{thread, time::Duration};

fn main() {
    trpl::run(async {
        // We will call `slow` here later
    });
}

fn slow(name: &str, ms: u64) {
    thread::sleep(Duration::from_millis(ms));
    println!("'{name}' ran for {ms}ms");
}

এই কোডটি trpl::sleep-এর পরিবর্তে std::thread::sleep ব্যবহার করে যাতে slow কল করা বর্তমান থ্রেডটিকে কিছু সংখ্যক মিলিসেকেন্ডের জন্য ব্লক করে। আমরা slow-কে বাস্তব-বিশ্বের অপারেশনগুলির বিকল্প হিসাবে ব্যবহার করতে পারি যা দীর্ঘ সময় ধরে চলে এবং ব্লকিং।

লিস্টিং 17-23-এ, আমরা একজোড়া ফিউচারে এই ধরনের CPU-বাউন্ড কাজ অনুকরণ করতে slow ব্যবহার করি।

extern crate trpl; // required for mdbook test

use std::{thread, time::Duration};

fn main() {
    trpl::run(async {
        let a = async {
            println!("'a' started.");
            slow("a", 30);
            slow("a", 10);
            slow("a", 20);
            trpl::sleep(Duration::from_millis(50)).await;
            println!("'a' finished.");
        };

        let b = async {
            println!("'b' started.");
            slow("b", 75);
            slow("b", 10);
            slow("b", 15);
            slow("b", 350);
            trpl::sleep(Duration::from_millis(50)).await;
            println!("'b' finished.");
        };

        trpl::race(a, b).await;
    });
}

fn slow(name: &str, ms: u64) {
    thread::sleep(Duration::from_millis(ms));
    println!("'{name}' ran for {ms}ms");
}

শুরু করার জন্য, প্রতিটি ফিউচার কেবল একগুচ্ছ ধীর অপারেশন করার পরেই রানটাইমের কাছে নিয়ন্ত্রণ ফিরিয়ে দেয়। আপনি যদি এই কোডটি চালান, আপনি এই আউটপুট দেখতে পাবেন:

'a' started.
'a' ran for 30ms
'a' ran for 10ms
'a' ran for 20ms
'b' started.
'b' ran for 75ms
'b' ran for 10ms
'b' ran for 15ms
'b' ran for 350ms
'a' finished.

আমাদের আগের উদাহরণের মতো, a শেষ হওয়ার সাথে সাথেই race শেষ হয়। তবে, দুটি ফিউচারের মধ্যে কোনো ইন্টারলিভিং নেই। a ফিউচারটি trpl::sleep কল await না হওয়া পর্যন্ত তার সমস্ত কাজ করে, তারপর b ফিউচারটি তার নিজের trpl::sleep কল await না হওয়া পর্যন্ত তার সমস্ত কাজ করে, এবং অবশেষে a ফিউচারটি সম্পন্ন হয়। উভয় ফিউচারকে তাদের ধীর কাজগুলির মধ্যে অগ্রগতি করার অনুমতি দিতে, আমাদের await পয়েন্ট প্রয়োজন যাতে আমরা রানটাইমের কাছে নিয়ন্ত্রণ ফিরিয়ে দিতে পারি। এর মানে হলো আমাদের এমন কিছু দরকার যা আমরা await করতে পারি!

আমরা ইতিমধ্যে লিস্টিং 17-23-এ এই ধরনের হ্যান্ডঅফ দেখতে পাচ্ছি: যদি আমরা a ফিউচারের শেষে trpl::sleep সরিয়ে ফেলি, তবে এটি b ফিউচারটি একেবারেই না চলেই সম্পন্ন হবে। আসুন অপারেশনগুলিকে অগ্রগতিতে স্যুইচ করার জন্য একটি সূচনা বিন্দু হিসাবে sleep ফাংশনটি ব্যবহার করার চেষ্টা করি, যেমনটি লিস্টিং 17-24-এ দেখানো হয়েছে।

extern crate trpl; // required for mdbook test

use std::{thread, time::Duration};

fn main() {
    trpl::run(async {
        let one_ms = Duration::from_millis(1);

        let a = async {
            println!("'a' started.");
            slow("a", 30);
            trpl::sleep(one_ms).await;
            slow("a", 10);
            trpl::sleep(one_ms).await;
            slow("a", 20);
            trpl::sleep(one_ms).await;
            println!("'a' finished.");
        };

        let b = async {
            println!("'b' started.");
            slow("b", 75);
            trpl::sleep(one_ms).await;
            slow("b", 10);
            trpl::sleep(one_ms).await;
            slow("b", 15);
            trpl::sleep(one_ms).await;
            slow("b", 350);
            trpl::sleep(one_ms).await;
            println!("'b' finished.");
        };

        trpl::race(a, b).await;
    });
}

fn slow(name: &str, ms: u64) {
    thread::sleep(Duration::from_millis(ms));
    println!("'{name}' ran for {ms}ms");
}

লিস্টিং 17-24-এ, আমরা slow-এর প্রতিটি কলের মধ্যে await পয়েন্ট সহ trpl::sleep কল যোগ করি। এখন দুটি ফিউচারের কাজ ইন্টারলিভড (interleaved) বা মিশ্রিত:

'a' started.
'a' ran for 30ms
'b' started.
'b' ran for 75ms
'a' ran for 10ms
'b' ran for 10ms
'a' ran for 20ms
'b' ran for 15ms
'a' finished.

a ফিউচারটি b-কে নিয়ন্ত্রণ হস্তান্তর করার আগে এখনও কিছুক্ষণ চলে, কারণ এটি trpl::sleep কল করার আগেই slow কল করে, কিন্তু তারপরে ফিউচারগুলি প্রতিবার যখন তাদের মধ্যে একটি await পয়েন্টে পৌঁছায় তখন অদলবদল করে। এই ক্ষেত্রে, আমরা slow-এর প্রতিটি কলের পরে এটি করেছি, কিন্তু আমরা কাজটি আমাদের জন্য সবচেয়ে অর্থপূর্ণ উপায়ে ভাগ করতে পারতাম।

তবে আমরা এখানে সত্যিই স্লিপ করতে চাই না: আমরা যত দ্রুত সম্ভব অগ্রগতি করতে চাই। আমাদের কেবল রানটাইমের কাছে নিয়ন্ত্রণ ফিরিয়ে দিতে হবে। আমরা এটি সরাসরি করতে পারি, yield_now ফাংশন ব্যবহার করে। লিস্টিং 17-25-এ, আমরা সেই সমস্ত sleep কলগুলিকে yield_now দিয়ে প্রতিস্থাপন করি।

extern crate trpl; // required for mdbook test

use std::{thread, time::Duration};

fn main() {
    trpl::run(async {
        let a = async {
            println!("'a' started.");
            slow("a", 30);
            trpl::yield_now().await;
            slow("a", 10);
            trpl::yield_now().await;
            slow("a", 20);
            trpl::yield_now().await;
            println!("'a' finished.");
        };

        let b = async {
            println!("'b' started.");
            slow("b", 75);
            trpl::yield_now().await;
            slow("b", 10);
            trpl::yield_now().await;
            slow("b", 15);
            trpl::yield_now().await;
            slow("b", 350);
            trpl::yield_now().await;
            println!("'b' finished.");
        };

        trpl::race(a, b).await;
    });
}

fn slow(name: &str, ms: u64) {
    thread::sleep(Duration::from_millis(ms));
    println!("'{name}' ran for {ms}ms");
}

এই কোডটি প্রকৃত উদ্দেশ্য সম্পর্কে আরও স্পষ্ট এবং sleep ব্যবহার করার চেয়ে উল্লেখযোগ্যভাবে দ্রুত হতে পারে, কারণ sleep দ্বারা ব্যবহৃত টাইমারগুলির মতো টাইমারগুলির প্রায়শই তারা কতটা সূক্ষ্ম হতে পারে তার উপর সীমাবদ্ধতা থাকে। আমরা যে sleep-এর সংস্করণটি ব্যবহার করছি, উদাহরণস্বরূপ, এটি সর্বদা কমপক্ষে এক মিলিসেকেন্ডের জন্য ঘুমাবে, এমনকি যদি আমরা এটিকে এক ন্যানোসেকেন্ডের একটি Duration পাস করি। আবারও, আধুনিক কম্পিউটারগুলি দ্রুত: তারা এক মিলিসেকেন্ডে অনেক কিছু করতে পারে!

আপনি লিস্টিং 17-26-এর মতো একটি ছোট বেঞ্চমার্ক সেট আপ করে এটি নিজেই দেখতে পারেন। (এটি পারফরম্যান্স পরীক্ষা করার জন্য একটি বিশেষ কঠোর উপায় নয়, তবে এটি এখানে পার্থক্য দেখানোর জন্য যথেষ্ট।)

extern crate trpl; // required for mdbook test

use std::time::{Duration, Instant};

fn main() {
    trpl::run(async {
        let one_ns = Duration::from_nanos(1);
        let start = Instant::now();
        async {
            for _ in 1..1000 {
                trpl::sleep(one_ns).await;
            }
        }
        .await;
        let time = Instant::now() - start;
        println!(
            "'sleep' version finished after {} seconds.",
            time.as_secs_f32()
        );

        let start = Instant::now();
        async {
            for _ in 1..1000 {
                trpl::yield_now().await;
            }
        }
        .await;
        let time = Instant::now() - start;
        println!(
            "'yield' version finished after {} seconds.",
            time.as_secs_f32()
        );
    });
}

এখানে, আমরা সমস্ত স্ট্যাটাস প্রিন্টিং এড়িয়ে যাই, trpl::sleep-কে এক-ন্যানোসেকেন্ড Duration পাস করি, এবং প্রতিটি ফিউচারকে নিজে চলতে দিই, ফিউচারগুলির মধ্যে কোনো স্যুইচিং ছাড়াই। তারপরে আমরা ১,০০০ ইটারেশন চালাই এবং দেখি trpl::sleep ব্যবহারকারী ফিউচারটি trpl::yield_now ব্যবহারকারী ফিউচারের তুলনায় কত সময় নেয়।

yield_now সহ সংস্করণটি অনেক দ্রুত!

এর মানে হলো যে async এমনকি কম্পিউট-বাউন্ড টাস্কগুলির জন্যও উপযোগী হতে পারে, আপনার প্রোগ্রাম আর কী করছে তার উপর নির্ভর করে, কারণ এটি প্রোগ্রামের বিভিন্ন অংশের মধ্যে সম্পর্ক কাঠামোবদ্ধ করার জন্য একটি দরকারী টুল সরবরাহ করে। এটি কো-অপারেটিভ মাল্টিটাস্কিং (cooperative multitasking)-এর একটি রূপ, যেখানে প্রতিটি ফিউচারের await পয়েন্টের মাধ্যমে কখন নিয়ন্ত্রণ হস্তান্তর করবে তা নির্ধারণ করার ক্ষমতা রয়েছে। তাই প্রতিটি ফিউচারেরও খুব বেশিক্ষণ ব্লক করা এড়ানোর দায়িত্ব রয়েছে। কিছু রাস্ট-ভিত্তিক এমবেডেড অপারেটিং সিস্টেমে, এটিই একমাত্র ধরনের মাল্টিটাস্কিং!

বাস্তব-জগতের কোডে, আপনি সাধারণত প্রতিটি লাইনে await পয়েন্টগুলির সাথে ফাংশন কলগুলিকে অদলবদল করবেন না, অবশ্যই। যদিও এইভাবে নিয়ন্ত্রণ হস্তান্তর করা তুলনামূলকভাবে সস্তা, এটি বিনামূল্যে নয়। অনেক ক্ষেত্রে, একটি কম্পিউট-বাউন্ড টাস্ককে ভাঙার চেষ্টা করলে এটি উল্লেখযোগ্যভাবে ধীর হতে পারে, তাই কখনও কখনও সামগ্রিক পারফরম্যান্সের জন্য একটি অপারেশনকে সংক্ষিপ্তভাবে ব্লক করতে দেওয়া ভালো। আপনার কোডের প্রকৃত পারফরম্যান্সের বাধাগুলি কী তা দেখতে সর্বদা পরিমাপ করুন। তবে, যদি আপনি এমন অনেক কাজ সিরিয়ালে হতে দেখেন যা আপনি কনকারেন্টলি হওয়ার আশা করেছিলেন, তবে অন্তর্নিহিত গতিশীলতাটি মনে রাখা গুরুত্বপূর্ণ!

আমাদের নিজস্ব অ্যাসিঙ্ক্রোনাস অ্যাবস্ট্র্যাকশন তৈরি করা

আমরা নতুন প্যাটার্ন তৈরি করতে ফিউচারগুলিকে একসাথে কম্পোজও করতে পারি। উদাহরণস্বরূপ, আমরা আমাদের কাছে ইতিমধ্যে থাকা async বিল্ডিং ব্লকগুলি দিয়ে একটি timeout ফাংশন তৈরি করতে পারি। যখন আমরা শেষ করব, ফলাফলটি আরেকটি বিল্ডিং ব্লক হবে যা আমরা আরও async অ্যাবস্ট্র্যাকশন তৈরি করতে ব্যবহার করতে পারি।

লিস্টিং 17-27 দেখায় যে আমরা এই timeout একটি ধীর ফিউচারের সাথে কীভাবে কাজ করবে বলে আশা করব।

extern crate trpl; // required for mdbook test

use std::time::Duration;

fn main() {
    trpl::run(async {
        let slow = async {
            trpl::sleep(Duration::from_millis(100)).await;
            "I finished!"
        };

        match timeout(slow, Duration::from_millis(10)).await {
            Ok(message) => println!("Succeeded with '{message}'"),
            Err(duration) => {
                println!("Failed after {} seconds", duration.as_secs())
            }
        }
    });
}

আসুন এটি ইমপ্লিমেন্ট করি! শুরু করার জন্য, আসুন timeout-এর API সম্পর্কে চিন্তা করি:

  • এটি নিজে একটি async ফাংশন হতে হবে যাতে আমরা এটিকে await করতে পারি।
  • এর প্রথম প্যারামিটারটি চালানোর জন্য একটি ফিউচার হওয়া উচিত। আমরা এটিকে যেকোনো ফিউচারের সাথে কাজ করার অনুমতি দেওয়ার জন্য জেনেরিক করতে পারি।
  • এর দ্বিতীয় প্যারামিটারটি হবে অপেক্ষা করার সর্বোচ্চ সময়। যদি আমরা একটি Duration ব্যবহার করি, তবে এটি trpl::sleep-এ পাস করা সহজ হবে।
  • এটি একটি Result রিটার্ন করা উচিত। যদি ফিউচারটি সফলভাবে সম্পন্ন হয়, Resultটি Ok হবে ফিউচার দ্বারা উৎপাদিত মান সহ। যদি টাইমআউটটি আগে শেষ হয়ে যায়, Resultটি Err হবে টাইমআউটটি যে সময় অপেক্ষা করেছে সেই সময়কাল সহ।

লিস্টিং 17-28 এই ডিক্লারেশনটি দেখায়।

extern crate trpl; // required for mdbook test

use std::time::Duration;

fn main() {
    trpl::run(async {
        let slow = async {
            trpl::sleep(Duration::from_secs(5)).await;
            "Finally finished"
        };

        match timeout(slow, Duration::from_secs(2)).await {
            Ok(message) => println!("Succeeded with '{message}'"),
            Err(duration) => {
                println!("Failed after {} seconds", duration.as_secs())
            }
        }
    });
}

async fn timeout<F: Future>(
    future_to_try: F,
    max_time: Duration,
) -> Result<F::Output, Duration> {
    // Here is where our implementation will go!
}

এটি টাইপের জন্য আমাদের লক্ষ্য পূরণ করে। এখন আসুন আমাদের প্রয়োজনীয় আচরণ সম্পর্কে চিন্তা করি: আমরা পাস করা ফিউচারটিকে সময়কালের বিরুদ্ধে রেস করতে চাই। আমরা সময়কাল থেকে একটি টাইমার ফিউচার তৈরি করতে trpl::sleep ব্যবহার করতে পারি, এবং সেই টাইমারটিকে কলারের পাস করা ফিউচারের সাথে চালানোর জন্য trpl::race ব্যবহার করতে পারি।

আমরা আরও জানি যে race ফেয়ার নয়, আর্গুমেন্টগুলিকে যে ক্রমে পাস করা হয় সেই ক্রমে পোল করে। সুতরাং, আমরা future_to_try-কে প্রথমে race-এ পাস করি যাতে max_time খুব কম সময় হলেও এটি সম্পন্ন হওয়ার সুযোগ পায়। যদি future_to_try প্রথমে শেষ হয়, race future_to_try-এর আউটপুট সহ Left রিটার্ন করবে। যদি timer প্রথমে শেষ হয়, race টাইমারের আউটপুট () সহ Right রিটার্ন করবে।

লিস্টিং 17-29-এ, আমরা trpl::race await করার ফলাফলের উপর ম্যাচ করি।

extern crate trpl; // required for mdbook test

use std::time::Duration;

use trpl::Either;

// --snip--

fn main() {
    trpl::run(async {
        let slow = async {
            trpl::sleep(Duration::from_secs(5)).await;
            "Finally finished"
        };

        match timeout(slow, Duration::from_secs(2)).await {
            Ok(message) => println!("Succeeded with '{message}'"),
            Err(duration) => {
                println!("Failed after {} seconds", duration.as_secs())
            }
        }
    });
}

async fn timeout<F: Future>(
    future_to_try: F,
    max_time: Duration,
) -> Result<F::Output, Duration> {
    match trpl::race(future_to_try, trpl::sleep(max_time)).await {
        Either::Left(output) => Ok(output),
        Either::Right(_) => Err(max_time),
    }
}

যদি future_to_try সফল হয় এবং আমরা একটি Left(output) পাই, আমরা Ok(output) রিটার্ন করি। যদি স্লিপ টাইমারটি পরিবর্তে শেষ হয়ে যায় এবং আমরা একটি Right(()) পাই, আমরা _ দিয়ে () উপেক্ষা করি এবং পরিবর্তে Err(max_time) রিটার্ন করি।

এর সাথে, আমাদের কাছে দুটি অন্য async হেল্পার দিয়ে তৈরি একটি কার্যকরী timeout আছে। যদি আমরা আমাদের কোড চালাই, এটি টাইমআউটের পরে ব্যর্থতার মোড প্রিন্ট করবে:

Failed after 2 seconds

যেহেতু ফিউচারগুলি অন্য ফিউচারের সাথে কম্পোজ করে, আপনি ছোট async বিল্ডিং ব্লক ব্যবহার করে সত্যিই শক্তিশালী টুল তৈরি করতে পারেন। উদাহরণস্বরূপ, আপনি রিট্রাইয়ের সাথে টাইমআউট একত্রিত করতে এই একই পদ্ধতি ব্যবহার করতে পারেন, এবং পরিবর্তে সেগুলি নেটওয়ার্ক কলের মতো অপারেশনের সাথে ব্যবহার করতে পারেন (অধ্যায়ের শুরুর উদাহরণগুলির মধ্যে একটি)।

বাস্তবে, আপনি সাধারণত সরাসরি async এবং await-এর সাথে কাজ করবেন, এবং দ্বিতীয়ত join, join_all, race, এবং আরও অনেক ফাংশন এবং ম্যাক্রোর সাথে কাজ করবেন। আপনাকে শুধুমাত্র সেই API-গুলির সাথে ফিউচার ব্যবহার করার জন্য মাঝে মাঝে pin-এর প্রয়োজন হবে।

আমরা এখন একই সময়ে একাধিক ফিউচারের সাথে কাজ করার বেশ কয়েকটি উপায় দেখেছি। এরপরে, আমরা দেখব কীভাবে আমরা সময়ের সাথে সাথে স্ট্রিম (streams) দিয়ে একটি ক্রমানুসারে একাধিক ফিউচারের সাথে কাজ করতে পারি। তবে, প্রথমে আপনার বিবেচনা করার মতো আরও কয়েকটি বিষয় এখানে রয়েছে:

  • কোনো গ্রুপের সমস্ত ফিউচার শেষ হওয়ার জন্য অপেক্ষা করতে আমরা join_all-এর সাথে একটি Vec ব্যবহার করেছি। আপনি কীভাবে একটি Vec ব্যবহার করে ফিউচারের একটি গ্রুপকে ক্রমানুসারে প্রসেস করতে পারেন? এটি করার ট্রেড-অফগুলি কী কী?

  • futures ক্রেট থেকে futures::stream::FuturesUnordered টাইপটি দেখুন। এটি ব্যবহার করা একটি Vec ব্যবহার করার থেকে কীভাবে ভিন্ন হবে? (ক্রেটের stream অংশ থেকে এটি এসেছে এই সত্যটি নিয়ে চিন্তা করবেন না; এটি যেকোনো ফিউচারের কালেকশনের সাথে ঠিকঠাক কাজ করে।)