Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

স্ট্রীম: পর্যায়ক্রমিক ফিউচার (Futures in Sequence)

এই অধ্যায়ে এখন পর্যন্ত আমরা মূলত স্বতন্ত্র ফিউচার (individual futures) নিয়ে কাজ করেছি। এর একটি বড় ব্যতিক্রম ছিল আমরা যে অ্যাসিঙ্ক্রোনাস চ্যানেলটি ব্যবহার করেছি। মনে করুন, এই অধ্যায়ের শুরুতে ["Message Passing"][17-02-messages] বিভাগে আমরা আমাদের অ্যাসিঙ্ক্রোনাস চ্যানেলের রিসিভারটি কীভাবে ব্যবহার করেছি। অ্যাসিঙ্ক্রোনাস recv মেথড সময়ের সাথে সাথে আইটেমের একটি ক্রম তৈরি করে। এটি একটি অনেক বেশি সাধারণ প্যাটার্নের উদাহরণ যা স্ট্রীম (stream) নামে পরিচিত।

আমরা চ্যাপ্টার ১৩-এ আইটেমের একটি ক্রম দেখেছিলাম, যখন আমরা [The Iterator Trait and the next Method][iterator-trait] বিভাগে Iterator ট্রেইট নিয়ে আলোচনা করেছিলাম, কিন্তু ইটারেটর এবং অ্যাসিঙ্ক্রোনাস চ্যানেল রিসিভারের মধ্যে দুটি পার্থক্য রয়েছে। প্রথম পার্থক্য হলো সময়: ইটারেটরগুলি সিঙ্ক্রোনাস, যেখানে চ্যানেল রিসিভার অ্যাসিঙ্ক্রোনাস। দ্বিতীয়টি হলো API। সরাসরি Iterator-এর সাথে কাজ করার সময়, আমরা এর সিঙ্ক্রোনাস next মেথড কল করি। বিশেষ করে trpl::Receiver স্ট্রীমের সাথে, আমরা এর পরিবর্তে একটি অ্যাসিঙ্ক্রোনাস recv মেথড কল করেছি। অন্যথায়, এই API-গুলি খুব অনুরূপ মনে হয়, এবং এই সাদৃশ্যটি কোনো কাকতালীয় ঘটনা নয়। একটি স্ট্রীম হলো ইটারেশনের একটি অ্যাসিঙ্ক্রোনাস রূপ। যেখানে trpl::Receiver বিশেষভাবে বার্তা পাওয়ার জন্য অপেক্ষা করে, সেখানে সাধারণ-উদ্দেশ্যমূলক স্ট্রীম API অনেক বেশি বিস্তৃত: এটি Iterator-এর মতো পরবর্তী আইটেম সরবরাহ করে, কিন্তু অ্যাসিঙ্ক্রোনাসভাবে।

রাস্টে ইটারেটর এবং স্ট্রীমের মধ্যে সাদৃশ্য থাকার মানে হলো আমরা আসলে যেকোনো ইটারেটর থেকে একটি স্ট্রীম তৈরি করতে পারি। একটি ইটারেটরের মতো, আমরা একটি স্ট্রীমের next মেথড কল করে এবং তারপর আউটপুট await করে এর সাথে কাজ করতে পারি, যেমনটি লিস্টিং 17-30-এ দেখানো হয়েছে।

extern crate trpl; // required for mdbook test

fn main() {
    trpl::run(async {
        let values = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
        let iter = values.iter().map(|n| n * 2);
        let mut stream = trpl::stream_from_iter(iter);

        while let Some(value) = stream.next().await {
            println!("The value was: {value}");
        }
    });
}

আমরা সংখ্যার একটি অ্যারে দিয়ে শুরু করি, যেটিকে আমরা একটি ইটারেটরে রূপান্তর করি এবং তারপর সমস্ত মান দ্বিগুণ করার জন্য map কল করি। তারপর আমরা trpl::stream_from_iter ফাংশন ব্যবহার করে ইটারেটরটিকে একটি স্ট্রীমে রূপান্তর করি। এরপর, আমরা while let লুপ দিয়ে স্ট্রীমের আইটেমগুলির উপর লুপ করি যখন সেগুলি আসে।

দুর্ভাগ্যবশত, যখন আমরা কোডটি চালানোর চেষ্টা করি, এটি কম্পাইল হয় না, বরং এটি রিপোর্ট করে যে কোনো next মেথড উপলব্ধ নেই:

error[E0599]: no method named `next` found for struct `Iter` in the current scope
  --> src/main.rs:10:40
   |
10 |         while let Some(value) = stream.next().await {
   |                                        ^^^^
   |
   = note: the full type name has been written to 'file:///projects/async-await/target/debug/deps/async_await-575db3dd3197d257.long-type-14490787947592691573.txt'
   = note: consider using `--verbose` to print the full type name to the console
   = help: items from traits can only be used if the trait is in scope
help: the following traits which provide `next` are implemented but not in scope; perhaps you want to import one of them
   |
1  + use crate::trpl::StreamExt;
   |
1  + use futures_util::stream::stream::StreamExt;
   |
1  + use std::iter::Iterator;
   |
1  + use std::str::pattern::Searcher;
   |
help: there is a method `try_next` with a similar name
   |
10 |         while let Some(value) = stream.try_next().await {
   |                                        ~~~~~~~~

এই আউটপুট যেমন ব্যাখ্যা করে, কম্পাইলার ত্রুটির কারণ হলো next মেথড ব্যবহার করতে সক্ষম হওয়ার জন্য আমাদের সঠিক ট্রেইটটি স্কোপে (scope) প্রয়োজন। আমাদের এখন পর্যন্ত আলোচনার ভিত্তিতে, আপনি যুক্তিসঙ্গতভাবে আশা করতে পারেন যে সেই ট্রেইটটি হবে Stream, কিন্তু এটি আসলে StreamExtএক্সটেনশন (extension)-এর সংক্ষিপ্ত রূপ, Ext হলো রাস্ট কমিউনিটিতে একটি সাধারণ প্যাটার্ন একটি ট্রেইটকে অন্য একটি দিয়ে প্রসারিত করার জন্য।

আমরা অধ্যায়ের শেষে Stream এবং StreamExt ট্রেইটগুলি সম্পর্কে আরও কিছু বিস্তারিত ব্যাখ্যা করব, কিন্তু আপাতত আপনার যা জানা দরকার তা হলো Stream ট্রেইট একটি নিম্ন-স্তরের ইন্টারফেস সংজ্ঞায়িত করে যা কার্যকরভাবে Iterator এবং Future ট্রেইটগুলিকে একত্রিত করে। StreamExt Stream-এর উপরে একটি উচ্চ-স্তরের API সেট সরবরাহ করে, যার মধ্যে next মেথড এবং Iterator ট্রেইট দ্বারা প্রদত্ত অন্যান্য ইউটিলিটি মেথডগুলির মতো অন্যান্য মেথডও রয়েছে। Stream এবং StreamExt এখনও রাস্টের স্ট্যান্ডার্ড লাইব্রেরির অংশ নয়, তবে বেশিরভাগ ইকোসিস্টেম ক্রেট একই সংজ্ঞা ব্যবহার করে।

কম্পাইলার ত্রুটির সমাধান হলো trpl::StreamExt-এর জন্য একটি use স্টেটমেন্ট যোগ করা, যেমনটি লিস্টিং 17-31-এ দেখানো হয়েছে।

extern crate trpl; // required for mdbook test

use trpl::StreamExt;

fn main() {
    trpl::run(async {
        let values = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
        let iter = values.iter().map(|n| n * 2);
        let mut stream = trpl::stream_from_iter(iter);

        while let Some(value) = stream.next().await {
            println!("The value was: {value}");
        }
    });
}

এই সমস্ত অংশগুলি একসাথে রাখলে, এই কোডটি আমাদের ইচ্ছামতো কাজ করে! আরও কী, এখন যেহেতু আমাদের স্কোপে StreamExt আছে, আমরা এর সমস্ত ইউটিলিটি মেথড ব্যবহার করতে পারি, ঠিক ইটারেটরের মতোই। উদাহরণস্বরূপ, লিস্টিং 17-32-এ, আমরা তিন এবং পাঁচের গুণিতক ছাড়া বাকি সবকিছু ফিল্টার করতে filter মেথড ব্যবহার করি।

extern crate trpl; // required for mdbook test

use trpl::StreamExt;

fn main() {
    trpl::run(async {
        let values = 1..101;
        let iter = values.map(|n| n * 2);
        let stream = trpl::stream_from_iter(iter);

        let mut filtered =
            stream.filter(|value| value % 3 == 0 || value % 5 == 0);

        while let Some(value) = filtered.next().await {
            println!("The value was: {value}");
        }
    });
}

অবশ্যই, এটি খুব আকর্ষণীয় নয়, যেহেতু আমরা সাধারণ ইটারেটর দিয়ে এবং কোনো async ছাড়াই একই কাজ করতে পারতাম। আসুন দেখি স্ট্রীমের জন্য অনন্য কী করা যায়।

স্ট্রীম কম্পোজ করা (Composing Streams)

অনেক ধারণা স্বাভাবিকভাবেই স্ট্রীম হিসাবে উপস্থাপিত হয়: একটি কিউ (queue)-তে আইটেম উপলব্ধ হওয়া, ফাইলসিস্টেম থেকে ডেটার খণ্ডাংশ ধীরে ধীরে আনা যখন পুরো ডেটা সেট কম্পিউটারের মেমরির জন্য খুব বড় হয়, অথবা সময়ের সাথে সাথে নেটওয়ার্কের মাধ্যমে ডেটা আসা। যেহেতু স্ট্রীমগুলি ফিউচার, আমরা সেগুলিকে অন্য যেকোনো ধরনের ফিউচারের সাথে ব্যবহার করতে পারি এবং সেগুলিকে আকর্ষণীয় উপায়ে একত্রিত করতে পারি। উদাহরণস্বরূপ, আমরা খুব বেশি নেটওয়ার্ক কল ট্রিগার করা এড়াতে ইভেন্টগুলিকে ব্যাচ করতে পারি, দীর্ঘ সময় ধরে চলা অপারেশনগুলির সিকোয়েন্সে টাইমআউট সেট করতে পারি, অথবা অপ্রয়োজনীয় কাজ করা এড়াতে ইউজার ইন্টারফেস ইভেন্টগুলিকে থ্রটল করতে পারি।

আসুন আমরা একটি ছোট বার্তা স্ট্রীম তৈরি করে শুরু করি যা ওয়েবসকেট বা অন্য কোনো রিয়েল-টাইম কমিউনিকেশন প্রোটোকল থেকে আমরা যে ডেটা স্ট্রীম দেখতে পারি তার একটি বিকল্প হিসাবে কাজ করবে, যেমনটি লিস্টিং 17-33-এ দেখানো হয়েছে।

extern crate trpl; // required for mdbook test

use trpl::{ReceiverStream, Stream, StreamExt};

fn main() {
    trpl::run(async {
        let mut messages = get_messages();

        while let Some(message) = messages.next().await {
            println!("{message}");
        }
    });
}

fn get_messages() -> impl Stream<Item = String> {
    let (tx, rx) = trpl::channel();

    let messages = ["a", "b", "c", "d", "e", "f", "g", "h", "i", "j"];
    for message in messages {
        tx.send(format!("Message: '{message}'")).unwrap();
    }

    ReceiverStream::new(rx)
}

প্রথমে, আমরা get_messages নামে একটি ফাংশন তৈরি করি যা impl Stream<Item = String> রিটার্ন করে। এর ইমপ্লিমেন্টেশনের জন্য, আমরা একটি async চ্যানেল তৈরি করি, ইংরেজি বর্ণমালার প্রথম ১০টি অক্ষরের উপর লুপ করি, এবং সেগুলি চ্যানেলের মাধ্যমে পাঠাই।

আমরা একটি নতুন টাইপও ব্যবহার করি: ReceiverStream, যা trpl::channel থেকে rx রিসিভারকে next মেথড সহ একটি Stream-এ রূপান্তরিত করে। main-এ ফিরে, আমরা স্ট্রীম থেকে সমস্ত বার্তা প্রিন্ট করতে একটি while let লুপ ব্যবহার করি।

যখন আমরা এই কোডটি চালাই, আমরা ঠিক সেই ফলাফল পাই যা আমরা আশা করেছিলাম:

Message: 'a'
Message: 'b'
Message: 'c'
Message: 'd'
Message: 'e'
Message: 'f'
Message: 'g'
Message: 'h'
Message: 'i'
Message: 'j'

আবারও, আমরা এটি নিয়মিত Receiver API বা এমনকি নিয়মিত Iterator API দিয়েও করতে পারতাম, তাই আসুন এমন একটি ফিচার যোগ করি যার জন্য স্ট্রীম প্রয়োজন: স্ট্রীমের প্রতিটি আইটেমের জন্য একটি টাইমআউট যোগ করা, এবং আমরা যে আইটেমগুলি নির্গত করি সেগুলিতে একটি বিলম্ব যোগ করা, যেমনটি লিস্টিং 17-34-এ দেখানো হয়েছে।

extern crate trpl; // required for mdbook test

use std::{pin::pin, time::Duration};
use trpl::{ReceiverStream, Stream, StreamExt};

fn main() {
    trpl::run(async {
        let mut messages =
            pin!(get_messages().timeout(Duration::from_millis(200)));

        while let Some(result) = messages.next().await {
            match result {
                Ok(message) => println!("{message}"),
                Err(reason) => eprintln!("Problem: {reason:?}"),
            }
        }
    })
}

fn get_messages() -> impl Stream<Item = String> {
    let (tx, rx) = trpl::channel();

    let messages = ["a", "b", "c", "d", "e", "f", "g", "h", "i", "j"];
    for message in messages {
        tx.send(format!("Message: '{message}'")).unwrap();
    }

    ReceiverStream::new(rx)
}

আমরা timeout মেথড দিয়ে স্ট্রীমে একটি টাইমআউট যোগ করে শুরু করি, যা StreamExt ট্রেইট থেকে আসে। তারপর আমরা while let লুপের বডি আপডেট করি, কারণ স্ট্রীমটি এখন একটি Result রিটার্ন করে। Ok ভ্যারিয়েন্টটি নির্দেশ করে যে একটি বার্তা সময়মতো এসেছে; Err ভ্যারিয়েন্টটি নির্দেশ করে যে কোনো বার্তা আসার আগে টাইমআউট শেষ হয়ে গেছে। আমরা সেই ফলাফলের উপর match করি এবং হয় সফলভাবে বার্তা পেলে সেটি প্রিন্ট করি অথবা টাইমআউট সম্পর্কে একটি বিজ্ঞপ্তি প্রিন্ট করি। অবশেষে, লক্ষ্য করুন যে আমরা টাইমআউট প্রয়োগ করার পরে বার্তাগুলি পিন করি, কারণ টাইমআউট হেল্পার একটি স্ট্রীম তৈরি করে যা পোল করার জন্য পিন করা প্রয়োজন।

তবে, বার্তাগুলির মধ্যে কোনো বিলম্ব না থাকায়, এই টাইমআউটটি প্রোগ্রামের আচরণ পরিবর্তন করে না। আসুন আমরা যে বার্তাগুলি পাঠাই সেগুলিতে একটি পরিবর্তনশীল বিলম্ব যোগ করি, যেমনটি লিস্টিং 17-35-এ দেখানো হয়েছে।

extern crate trpl; // required for mdbook test

use std::{pin::pin, time::Duration};

use trpl::{ReceiverStream, Stream, StreamExt};

fn main() {
    trpl::run(async {
        let mut messages =
            pin!(get_messages().timeout(Duration::from_millis(200)));

        while let Some(result) = messages.next().await {
            match result {
                Ok(message) => println!("{message}"),
                Err(reason) => eprintln!("Problem: {reason:?}"),
            }
        }
    })
}

fn get_messages() -> impl Stream<Item = String> {
    let (tx, rx) = trpl::channel();

    trpl::spawn_task(async move {
        let messages = ["a", "b", "c", "d", "e", "f", "g", "h", "i", "j"];
        for (index, message) in messages.into_iter().enumerate() {
            let time_to_sleep = if index % 2 == 0 { 100 } else { 300 };
            trpl::sleep(Duration::from_millis(time_to_sleep)).await;

            tx.send(format!("Message: '{message}'")).unwrap();
        }
    });

    ReceiverStream::new(rx)
}

get_messages-এ, আমরা messages অ্যারের সাথে enumerate ইটারেটর মেথড ব্যবহার করি যাতে আমরা আইটেমটির সাথে আমরা যে প্রতিটি আইটেম পাঠাচ্ছি তার ইনডেক্স পেতে পারি। তারপর আমরা জোড়-ইনডেক্স আইটেমগুলিতে ১০০-মিলিসেকেন্ড বিলম্ব এবং বিজোড়-ইনডেক্স আইটেমগুলিতে ৩০০-মিলিসেকেন্ড বিলম্ব প্রয়োগ করি যাতে বাস্তব জগতে আমরা একটি বার্তা স্ট্রীম থেকে যে বিভিন্ন বিলম্ব দেখতে পারি তা অনুকরণ করা যায়। যেহেতু আমাদের টাইমআউট ২০০ মিলিসেকেন্ডের জন্য, এটি অর্ধেক বার্তাগুলিকে প্রভাবিত করা উচিত।

get_messages ফাংশনে বার্তাগুলির মধ্যে স্লিপ করার জন্য ব্লকিং ছাড়াই, আমাদের async ব্যবহার করতে হবে। যাইহোক, আমরা get_messages নিজেই একটি async ফাংশন করতে পারি না, কারণ তাহলে আমরা Stream<Item = String>>-এর পরিবর্তে একটি Future<Output = Stream<Item = String>> রিটার্ন করতাম। কলারকে স্ট্রীমে অ্যাক্সেস পেতে get_messages নিজেই await করতে হতো। কিন্তু মনে রাখবেন: একটি নির্দিষ্ট ফিউচারের মধ্যে সবকিছু রৈখিকভাবে ঘটে; কনকারেন্সি ফিউচারগুলির মধ্যে ঘটে। get_messages await করার জন্য রিসিভার স্ট্রীম রিটার্ন করার আগে প্রতিটি বার্তার মধ্যে স্লিপ বিলম্ব সহ সমস্ত বার্তা পাঠাতে হতো। ফলস্বরূপ, টাইমআউটটি অকেজো হয়ে যেত। স্ট্রীমে নিজেই কোনো বিলম্ব থাকত না; স্ট্রীমটি উপলব্ধ হওয়ার আগেই সেগুলি ঘটত।

পরিবর্তে, আমরা get_messages-কে একটি নিয়মিত ফাংশন হিসাবে রেখে দিই যা একটি স্ট্রীম রিটার্ন করে, এবং আমরা async sleep কলগুলি পরিচালনা করার জন্য একটি টাস্ক তৈরি করি।

দ্রষ্টব্য: এইভাবে spawn_task কল করা কাজ করে কারণ আমরা ইতিমধ্যে আমাদের রানটাইম সেট আপ করেছি; যদি আমরা না করতাম, এটি একটি প্যানিক (panic) সৃষ্টি করত। অন্যান্য ইমপ্লিমেন্টেশনগুলি বিভিন্ন ট্রেড-অফ বেছে নেয়: তারা একটি নতুন রানটাইম তৈরি করতে পারে এবং প্যানিক এড়াতে পারে কিন্তু সামান্য অতিরিক্ত ওভারহেডের সাথে শেষ হতে পারে, অথবা তারা রানটাইমের রেফারেন্স ছাড়াই টাস্ক তৈরি করার জন্য একটি স্বতন্ত্র উপায় সরবরাহ নাও করতে পারে। নিশ্চিত করুন যে আপনি জানেন আপনার রানটাইম কোন ট্রেড-অফ বেছে নিয়েছে এবং সেই অনুযায়ী আপনার কোড লিখুন!

এখন আমাদের কোডের একটি অনেক বেশি আকর্ষণীয় ফলাফল রয়েছে। প্রতি দুটি বার্তার মধ্যে, একটি Problem: Elapsed(()) ত্রুটি দেখা যায়।

Message: 'a'
Problem: Elapsed(())
Message: 'b'
Message: 'c'
Problem: Elapsed(())
Message: 'd'
Message: 'e'
Problem: Elapsed(())
Message: 'f'
Message: 'g'
Problem: Elapsed(())
Message: 'h'
Message: 'i'
Problem: Elapsed(())
Message: 'j'

টাইমআউটটি শেষ পর্যন্ত বার্তাগুলিকে আসতে বাধা দেয় না। আমরা এখনও সমস্ত মূল বার্তা পাই, কারণ আমাদের চ্যানেলটি আনবাউন্ডেড (unbounded): এটি মেমরিতে যতগুলি বার্তা ফিট করতে পারে ততগুলি ধরে রাখতে পারে। যদি বার্তাটি টাইমআউটের আগে না আসে, আমাদের স্ট্রীম হ্যান্ডলার সেটির হিসাব রাখবে, কিন্তু যখন এটি আবার স্ট্রীমটি পোল করবে, তখন বার্তাটি এখন এসে থাকতে পারে।

প্রয়োজনে আপনি অন্যান্য ধরনের চ্যানেল বা সাধারণভাবে অন্যান্য ধরনের স্ট্রীম ব্যবহার করে ভিন্ন আচরণ পেতে পারেন। আসুন সেগুলির মধ্যে একটি বাস্তবে দেখি সময় ব্যবধানের একটি স্ট্রীমকে এই বার্তাগুলির স্ট্রীমের সাথে একত্রিত করে।

স্ট্রীম মার্জ করা (Merging Streams)

প্রথমে, আসুন আরেকটি স্ট্রীম তৈরি করি, যা সরাসরি চালালে প্রতি মিলিসেকেন্ডে একটি আইটেম নির্গত করবে। সরলতার জন্য, আমরা একটি বিলম্ব সহ একটি বার্তা পাঠাতে sleep ফাংশন ব্যবহার করতে পারি এবং এটিকে get_messages-এ ব্যবহৃত একই পদ্ধতির সাথে একত্রিত করতে পারি যেখানে একটি চ্যানেল থেকে একটি স্ট্রীম তৈরি করা হয়েছিল। পার্থক্য হলো এইবার, আমরা অতিবাহিত ব্যবধানের সংখ্যা ফেরত পাঠাতে যাচ্ছি, তাই রিটার্ন টাইপ হবে impl Stream<Item = u32>, এবং আমরা ফাংশনটিকে get_intervals বলতে পারি (দেখুন লিস্টিং 17-36)।

extern crate trpl; // required for mdbook test

use std::{pin::pin, time::Duration};

use trpl::{ReceiverStream, Stream, StreamExt};

fn main() {
    trpl::run(async {
        let mut messages =
            pin!(get_messages().timeout(Duration::from_millis(200)));

        while let Some(result) = messages.next().await {
            match result {
                Ok(message) => println!("{message}"),
                Err(reason) => eprintln!("Problem: {reason:?}"),
            }
        }
    })
}

fn get_messages() -> impl Stream<Item = String> {
    let (tx, rx) = trpl::channel();

    trpl::spawn_task(async move {
        let messages = ["a", "b", "c", "d", "e", "f", "g", "h", "i", "j"];
        for (index, message) in messages.into_iter().enumerate() {
            let time_to_sleep = if index % 2 == 0 { 100 } else { 300 };
            trpl::sleep(Duration::from_millis(time_to_sleep)).await;

            tx.send(format!("Message: '{message}'")).unwrap();
        }
    });

    ReceiverStream::new(rx)
}

fn get_intervals() -> impl Stream<Item = u32> {
    let (tx, rx) = trpl::channel();

    trpl::spawn_task(async move {
        let mut count = 0;
        loop {
            trpl::sleep(Duration::from_millis(1)).await;
            count += 1;
            tx.send(count).unwrap();
        }
    });

    ReceiverStream::new(rx)
}

আমরা টাস্কের মধ্যে একটি count সংজ্ঞায়িত করে শুরু করি। (আমরা এটিকে টাস্কের বাইরেও সংজ্ঞায়িত করতে পারতাম, তবে যেকোনো প্রদত্ত ভেরিয়েবলের স্কোপ সীমিত রাখা পরিষ্কার।) তারপর আমরা একটি অসীম লুপ তৈরি করি। লুপের প্রতিটি ইটারেশন অ্যাসিঙ্ক্রোনাসভাবে এক মিলিসেকেন্ড ঘুমায়, কাউন্ট বৃদ্ধি করে, এবং তারপর এটি চ্যানেলের মাধ্যমে পাঠায়। যেহেতু এই সবকিছুই spawn_task দ্বারা তৈরি করা টাস্কের মধ্যে মোড়ানো, তাই এর সবকিছু—অসীম লুপ সহ—রানটাইমের সাথে পরিষ্কার হয়ে যাবে।

এই ধরনের অসীম লুপ, যা কেবল তখনই শেষ হয় যখন পুরো রানটাইমটি ভেঙে যায়, async রাস্টে বেশ সাধারণ: অনেক প্রোগ্রামের অনির্দিষ্টকালের জন্য চলতে থাকতে হয়। async-এর সাথে, এটি অন্য কিছু ব্লক করে না, যতক্ষণ না লুপের প্রতিটি ইটারেশনে অন্তত একটি await পয়েন্ট থাকে।

এখন, আমাদের মূল ফাংশনের async ব্লকে ফিরে, আমরা messages এবং intervals স্ট্রীমগুলিকে মার্জ করার চেষ্টা করতে পারি, যেমনটি লিস্টিং 17-37-এ দেখানো হয়েছে।

extern crate trpl; // required for mdbook test

use std::{pin::pin, time::Duration};

use trpl::{ReceiverStream, Stream, StreamExt};

fn main() {
    trpl::run(async {
        let messages = get_messages().timeout(Duration::from_millis(200));
        let intervals = get_intervals();
        let merged = messages.merge(intervals);

        while let Some(result) = merged.next().await {
            match result {
                Ok(message) => println!("{message}"),
                Err(reason) => eprintln!("Problem: {reason:?}"),
            }
        }
    })
}

fn get_messages() -> impl Stream<Item = String> {
    let (tx, rx) = trpl::channel();

    trpl::spawn_task(async move {
        let messages = ["a", "b", "c", "d", "e", "f", "g", "h", "i", "j"];
        for (index, message) in messages.into_iter().enumerate() {
            let time_to_sleep = if index % 2 == 0 { 100 } else { 300 };
            trpl::sleep(Duration::from_millis(time_to_sleep)).await;

            tx.send(format!("Message: '{message}'")).unwrap();
        }
    });

    ReceiverStream::new(rx)
}

fn get_intervals() -> impl Stream<Item = u32> {
    let (tx, rx) = trpl::channel();

    trpl::spawn_task(async move {
        let mut count = 0;
        loop {
            trpl::sleep(Duration::from_millis(1)).await;
            count += 1;
            tx.send(count).unwrap();
        }
    });

    ReceiverStream::new(rx)
}

আমরা get_intervals কল করে শুরু করি। তারপর আমরা messages এবং intervals স্ট্রীমগুলিকে merge মেথড দিয়ে মার্জ করি, যা একাধিক স্ট্রীমকে একটি স্ট্রীমে একত্রিত করে যা যেকোনো সোর্স স্ট্রীম থেকে আইটেমগুলি উপলব্ধ হওয়ার সাথে সাথেই তৈরি করে, কোনো নির্দিষ্ট ক্রম আরোপ না করে। অবশেষে, আমরা messages-এর পরিবর্তে সেই একত্রিত স্ট্রীমের উপর লুপ করি।

এই মুহুর্তে, messages বা intervals কোনোটিকেই পিন বা মিউটেবল (mutable) করার প্রয়োজন নেই, কারণ উভয়ই একক merged স্ট্রীমে একত্রিত হবে। যাইহোক, merge-এর এই কলটি কম্পাইল হয় না! (while let লুপে next কলটিও হয় না, তবে আমরা সেটিতে পরে ফিরে আসব।) এর কারণ হলো দুটি স্ট্রীমের বিভিন্ন টাইপ রয়েছে। messages স্ট্রীমটির টাইপ Timeout<impl Stream<Item = String>>, যেখানে Timeout হলো সেই টাইপ যা একটি timeout কলের জন্য Stream ইমপ্লিমেন্ট করে। intervals স্ট্রীমটির টাইপ impl Stream<Item = u32>। এই দুটি স্ট্রীমকে মার্জ করতে, আমাদের একটিকে অন্যটির সাথে মেলানোর জন্য রূপান্তর করতে হবে। আমরা intervals স্ট্রীমটি পুনরায় কাজ করব, কারণ messages ইতিমধ্যে আমাদের কাঙ্ক্ষিত মৌলিক বিন্যাসে রয়েছে এবং টাইমআউট ত্রুটিগুলি পরিচালনা করতে হবে (দেখুন লিস্টিং 17-38)।

extern crate trpl; // required for mdbook test

use std::{pin::pin, time::Duration};

use trpl::{ReceiverStream, Stream, StreamExt};

fn main() {
    trpl::run(async {
        let messages = get_messages().timeout(Duration::from_millis(200));
        let intervals = get_intervals()
            .map(|count| format!("Interval: {count}"))
            .timeout(Duration::from_secs(10));
        let merged = messages.merge(intervals);
        let mut stream = pin!(merged);

        while let Some(result) = stream.next().await {
            match result {
                Ok(message) => println!("{message}"),
                Err(reason) => eprintln!("Problem: {reason:?}"),
            }
        }
    })
}

fn get_messages() -> impl Stream<Item = String> {
    let (tx, rx) = trpl::channel();

    trpl::spawn_task(async move {
        let messages = ["a", "b", "c", "d", "e", "f", "g", "h", "i", "j"];
        for (index, message) in messages.into_iter().enumerate() {
            let time_to_sleep = if index % 2 == 0 { 100 } else { 300 };
            trpl::sleep(Duration::from_millis(time_to_sleep)).await;

            tx.send(format!("Message: '{message}'")).unwrap();
        }
    });

    ReceiverStream::new(rx)
}

fn get_intervals() -> impl Stream<Item = u32> {
    let (tx, rx) = trpl::channel();

    trpl::spawn_task(async move {
        let mut count = 0;
        loop {
            trpl::sleep(Duration::from_millis(1)).await;
            count += 1;
            tx.send(count).unwrap();
        }
    });

    ReceiverStream::new(rx)
}

প্রথমত, আমরা intervals-কে একটি স্ট্রিং-এ রূপান্তর করতে map হেল্পার মেথড ব্যবহার করতে পারি। দ্বিতীয়ত, আমাদের messages থেকে Timeout-এর সাথে মেলাতে হবে। যেহেতু আমরা আসলে intervals-এর জন্য কোনো টাইমআউট চাই না, তাই আমরা কেবল একটি টাইমআউট তৈরি করতে পারি যা আমাদের ব্যবহার করা অন্যান্য সময়কালের চেয়ে দীর্ঘ। এখানে, আমরা Duration::from_secs(10) দিয়ে একটি ১০-সেকেন্ডের টাইমআউট তৈরি করি। অবশেষে, আমাদের stream-কে মিউটেবল করতে হবে, যাতে while let লুপের next কলগুলি স্ট্রীমের মধ্য দিয়ে ইটারেট করতে পারে, এবং এটিকে পিন করতে হবে যাতে এটি করা নিরাপদ হয়। এটি আমাদের প্রায় যেখানে পৌঁছানো দরকার সেখানে নিয়ে যায়। সবকিছু টাইপ চেক করে। তবে আপনি যদি এটি চালান, দুটি সমস্যা হবে। প্রথমত, এটি কখনই থামবে না! আপনাকে ctrl-c দিয়ে এটি থামাতে হবে। দ্বিতীয়ত, ইংরেজি বর্ণমালার বার্তাগুলি সমস্ত ব্যবধান কাউন্টার বার্তাগুলির মধ্যে চাপা পড়ে যাবে:

--snip--
Interval: 38
Interval: 39
Interval: 40
Message: 'a'
Interval: 41
Interval: 42
Interval: 43
--snip--

লিস্টিং 17-39 এই শেষ দুটি সমস্যার সমাধান করার একটি উপায় দেখায়।

extern crate trpl; // required for mdbook test

use std::{pin::pin, time::Duration};

use trpl::{ReceiverStream, Stream, StreamExt};

fn main() {
    trpl::run(async {
        let messages = get_messages().timeout(Duration::from_millis(200));
        let intervals = get_intervals()
            .map(|count| format!("Interval: {count}"))
            .throttle(Duration::from_millis(100))
            .timeout(Duration::from_secs(10));
        let merged = messages.merge(intervals).take(20);
        let mut stream = pin!(merged);

        while let Some(result) = stream.next().await {
            match result {
                Ok(message) => println!("{message}"),
                Err(reason) => eprintln!("Problem: {reason:?}"),
            }
        }
    })
}

fn get_messages() -> impl Stream<Item = String> {
    let (tx, rx) = trpl::channel();

    trpl::spawn_task(async move {
        let messages = ["a", "b", "c", "d", "e", "f", "g", "h", "i", "j"];
        for (index, message) in messages.into_iter().enumerate() {
            let time_to_sleep = if index % 2 == 0 { 100 } else { 300 };
            trpl::sleep(Duration::from_millis(time_to_sleep)).await;

            tx.send(format!("Message: '{message}'")).unwrap();
        }
    });

    ReceiverStream::new(rx)
}

fn get_intervals() -> impl Stream<Item = u32> {
    let (tx, rx) = trpl::channel();

    trpl::spawn_task(async move {
        let mut count = 0;
        loop {
            trpl::sleep(Duration::from_millis(1)).await;
            count += 1;
            tx.send(count).unwrap();
        }
    });

    ReceiverStream::new(rx)
}

প্রথমে, আমরা intervals স্ট্রীমে throttle মেথড ব্যবহার করি যাতে এটি messages স্ট্রীমকে অভিভূত না করে। থ্রটলিং (throttling) হলো একটি ফাংশন কত ঘন ঘন কল করা হবে—অথবা, এই ক্ষেত্রে, স্ট্রীমটি কত ঘন ঘন পোল করা হবে—তা সীমিত করার একটি উপায়। প্রতি ১০০ মিলিসেকেন্ডে একবার করলেই হবে, কারণ আমাদের বার্তাগুলি প্রায় তত ঘন ঘনই আসে।

আমরা একটি স্ট্রীম থেকে কতগুলি আইটেম গ্রহণ করব তা সীমিত করতে, আমরা merged স্ট্রীমে take মেথড প্রয়োগ করি, কারণ আমরা কেবল একটি স্ট্রীম বা অন্যটিকে নয়, চূড়ান্ত আউটপুট সীমিত করতে চাই।

এখন যখন আমরা প্রোগ্রামটি চালাই, এটি স্ট্রীম থেকে ২০টি আইটেম নেওয়ার পরে থেমে যায়, এবং ব্যবধানগুলি বার্তাগুলিকে অভিভূত করে না। আমরা Interval: 100 বা Interval: 200 ইত্যাদিও পাই না, বরং Interval: 1, Interval: 2, ইত্যাদি পাই—যদিও আমাদের কাছে একটি সোর্স স্ট্রীম আছে যা প্রতি মিলিসেকেন্ডে একটি ইভেন্ট তৈরি করতে পারে। এর কারণ হলো throttle কল একটি নতুন স্ট্রীম তৈরি করে যা মূল স্ট্রীমটিকে র‍্যাপ করে যাতে মূল স্ট্রীমটি কেবল থ্রটল হারে পোল করা হয়, তার নিজস্ব "নেটিভ" হারে নয়। আমাদের কাছে একগুচ্ছ অব্যবহৃত ব্যবধান বার্তা নেই যা আমরা উপেক্ষা করার সিদ্ধান্ত নিচ্ছি। পরিবর্তে, আমরা প্রথম স্থানে সেই ব্যবধান বার্তাগুলি তৈরিই করি না! এটি রাস্টের ফিউচারের অন্তর্নিহিত "অলসতা" আবার কাজে লাগছে, যা আমাদের পারফরম্যান্স বৈশিষ্ট্যগুলি বেছে নেওয়ার সুযোগ দেয়।

Interval: 1
Message: 'a'
Interval: 2
Interval: 3
Problem: Elapsed(())
Interval: 4
Message: 'b'
Interval: 5
Message: 'c'
Interval: 6
Interval: 7
Problem: Elapsed(())
Interval: 8
Message: 'd'
Interval: 9
Message: 'e'
Interval: 10
Interval: 11
Problem: Elapsed(())
Interval: 12

আমাদের শেষ একটি জিনিস সামলাতে হবে: ত্রুটি! এই দুটি চ্যানেল-ভিত্তিক স্ট্রীমের সাথে, চ্যানেলের অন্য দিকটি বন্ধ হয়ে গেলে send কলগুলি ব্যর্থ হতে পারে—এবং এটি কেবল রানটাইম কীভাবে স্ট্রীম গঠনকারী ফিউচারগুলি চালায় তার উপর নির্ভর করে। এখন পর্যন্ত, আমরা unwrap কল করে এই সম্ভাবনাটি উপেক্ষা করেছি, কিন্তু একটি সুশৃঙ্খল অ্যাপে, আমাদের স্পষ্টভাবে ত্রুটিটি পরিচালনা করা উচিত, ন্যূনতমভাবে লুপটি শেষ করে যাতে আমরা আর কোনো বার্তা পাঠানোর চেষ্টা না করি। লিস্টিং 17-40 একটি সাধারণ ত্রুটি কৌশল দেখায়: সমস্যাটি প্রিন্ট করা এবং তারপর লুপগুলি থেকে break করা।

extern crate trpl; // required for mdbook test

use std::{pin::pin, time::Duration};

use trpl::{ReceiverStream, Stream, StreamExt};

fn main() {
    trpl::run(async {
        let messages = get_messages().timeout(Duration::from_millis(200));
        let intervals = get_intervals()
            .map(|count| format!("Interval #{count}"))
            .throttle(Duration::from_millis(500))
            .timeout(Duration::from_secs(10));
        let merged = messages.merge(intervals).take(20);
        let mut stream = pin!(merged);

        while let Some(result) = stream.next().await {
            match result {
                Ok(item) => println!("{item}"),
                Err(reason) => eprintln!("Problem: {reason:?}"),
            }
        }
    });
}

fn get_messages() -> impl Stream<Item = String> {
    let (tx, rx) = trpl::channel();

    trpl::spawn_task(async move {
        let messages = ["a", "b", "c", "d", "e", "f", "g", "h", "i", "j"];

        for (index, message) in messages.into_iter().enumerate() {
            let time_to_sleep = if index % 2 == 0 { 100 } else { 300 };
            trpl::sleep(Duration::from_millis(time_to_sleep)).await;

            if let Err(send_error) = tx.send(format!("Message: '{message}'")) {
                eprintln!("Cannot send message '{message}': {send_error}");
                break;
            }
        }
    });

    ReceiverStream::new(rx)
}

fn get_intervals() -> impl Stream<Item = u32> {
    let (tx, rx) = trpl::channel();

    trpl::spawn_task(async move {
        let mut count = 0;
        loop {
            trpl::sleep(Duration::from_millis(1)).await;
            count += 1;

            if let Err(send_error) = tx.send(count) {
                eprintln!("Could not send interval {count}: {send_error}");
                break;
            };
        }
    });

    ReceiverStream::new(rx)
}```

</Listing>

যথারীতি, একটি বার্তা পাঠানোর ত্রুটি পরিচালনা করার সঠিক উপায় পরিবর্তিত হবে; শুধু নিশ্চিত করুন যে আপনার একটি কৌশল আছে।

এখন যেহেতু আমরা বাস্তবে অনেক async দেখেছি, আসুন এক ধাপ পিছিয়ে গিয়ে `Future`, `Stream`, এবং রাস্ট async কাজ করানোর জন্য যে অন্যান্য মূল ট্রেইটগুলি ব্যবহার করে সেগুলির কিছু বিশদ বিবরণে যাই।

[17-02-messages]: ch17-02-concurrency-with-async.html#message-passing
[iterator-trait]: ch13-02-iterators.html#the-iterator-trait-and-the-next-method