স্ট্রীম: পর্যায়ক্রমিক ফিউচার (Futures in Sequence)
এই অধ্যায়ে এখন পর্যন্ত আমরা মূলত স্বতন্ত্র ফিউচার (individual futures) নিয়ে কাজ করেছি। এর একটি বড় ব্যতিক্রম ছিল আমরা যে অ্যাসিঙ্ক্রোনাস চ্যানেলটি ব্যবহার করেছি। মনে করুন, এই অধ্যায়ের শুরুতে ["Message Passing"][17-02-messages] বিভাগে আমরা আমাদের অ্যাসিঙ্ক্রোনাস চ্যানেলের রিসিভারটি কীভাবে ব্যবহার করেছি। অ্যাসিঙ্ক্রোনাস recv
মেথড সময়ের সাথে সাথে আইটেমের একটি ক্রম তৈরি করে। এটি একটি অনেক বেশি সাধারণ প্যাটার্নের উদাহরণ যা স্ট্রীম (stream) নামে পরিচিত।
আমরা চ্যাপ্টার ১৩-এ আইটেমের একটি ক্রম দেখেছিলাম, যখন আমরা [The Iterator Trait and the next
Method][iterator-trait] বিভাগে Iterator
ট্রেইট নিয়ে আলোচনা করেছিলাম, কিন্তু ইটারেটর এবং অ্যাসিঙ্ক্রোনাস চ্যানেল রিসিভারের মধ্যে দুটি পার্থক্য রয়েছে। প্রথম পার্থক্য হলো সময়: ইটারেটরগুলি সিঙ্ক্রোনাস, যেখানে চ্যানেল রিসিভার অ্যাসিঙ্ক্রোনাস। দ্বিতীয়টি হলো API। সরাসরি Iterator
-এর সাথে কাজ করার সময়, আমরা এর সিঙ্ক্রোনাস next
মেথড কল করি। বিশেষ করে trpl::Receiver
স্ট্রীমের সাথে, আমরা এর পরিবর্তে একটি অ্যাসিঙ্ক্রোনাস recv
মেথড কল করেছি। অন্যথায়, এই API-গুলি খুব অনুরূপ মনে হয়, এবং এই সাদৃশ্যটি কোনো কাকতালীয় ঘটনা নয়। একটি স্ট্রীম হলো ইটারেশনের একটি অ্যাসিঙ্ক্রোনাস রূপ। যেখানে trpl::Receiver
বিশেষভাবে বার্তা পাওয়ার জন্য অপেক্ষা করে, সেখানে সাধারণ-উদ্দেশ্যমূলক স্ট্রীম API অনেক বেশি বিস্তৃত: এটি Iterator
-এর মতো পরবর্তী আইটেম সরবরাহ করে, কিন্তু অ্যাসিঙ্ক্রোনাসভাবে।
রাস্টে ইটারেটর এবং স্ট্রীমের মধ্যে সাদৃশ্য থাকার মানে হলো আমরা আসলে যেকোনো ইটারেটর থেকে একটি স্ট্রীম তৈরি করতে পারি। একটি ইটারেটরের মতো, আমরা একটি স্ট্রীমের next
মেথড কল করে এবং তারপর আউটপুট await করে এর সাথে কাজ করতে পারি, যেমনটি লিস্টিং 17-30-এ দেখানো হয়েছে।
extern crate trpl; // required for mdbook test
fn main() {
trpl::run(async {
let values = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
let iter = values.iter().map(|n| n * 2);
let mut stream = trpl::stream_from_iter(iter);
while let Some(value) = stream.next().await {
println!("The value was: {value}");
}
});
}
আমরা সংখ্যার একটি অ্যারে দিয়ে শুরু করি, যেটিকে আমরা একটি ইটারেটরে রূপান্তর করি এবং তারপর সমস্ত মান দ্বিগুণ করার জন্য map
কল করি। তারপর আমরা trpl::stream_from_iter
ফাংশন ব্যবহার করে ইটারেটরটিকে একটি স্ট্রীমে রূপান্তর করি। এরপর, আমরা while let
লুপ দিয়ে স্ট্রীমের আইটেমগুলির উপর লুপ করি যখন সেগুলি আসে।
দুর্ভাগ্যবশত, যখন আমরা কোডটি চালানোর চেষ্টা করি, এটি কম্পাইল হয় না, বরং এটি রিপোর্ট করে যে কোনো next
মেথড উপলব্ধ নেই:
error[E0599]: no method named `next` found for struct `Iter` in the current scope
--> src/main.rs:10:40
|
10 | while let Some(value) = stream.next().await {
| ^^^^
|
= note: the full type name has been written to 'file:///projects/async-await/target/debug/deps/async_await-575db3dd3197d257.long-type-14490787947592691573.txt'
= note: consider using `--verbose` to print the full type name to the console
= help: items from traits can only be used if the trait is in scope
help: the following traits which provide `next` are implemented but not in scope; perhaps you want to import one of them
|
1 + use crate::trpl::StreamExt;
|
1 + use futures_util::stream::stream::StreamExt;
|
1 + use std::iter::Iterator;
|
1 + use std::str::pattern::Searcher;
|
help: there is a method `try_next` with a similar name
|
10 | while let Some(value) = stream.try_next().await {
| ~~~~~~~~
এই আউটপুট যেমন ব্যাখ্যা করে, কম্পাইলার ত্রুটির কারণ হলো next
মেথড ব্যবহার করতে সক্ষম হওয়ার জন্য আমাদের সঠিক ট্রেইটটি স্কোপে (scope) প্রয়োজন। আমাদের এখন পর্যন্ত আলোচনার ভিত্তিতে, আপনি যুক্তিসঙ্গতভাবে আশা করতে পারেন যে সেই ট্রেইটটি হবে Stream
, কিন্তু এটি আসলে StreamExt
। এক্সটেনশন (extension)-এর সংক্ষিপ্ত রূপ, Ext
হলো রাস্ট কমিউনিটিতে একটি সাধারণ প্যাটার্ন একটি ট্রেইটকে অন্য একটি দিয়ে প্রসারিত করার জন্য।
আমরা অধ্যায়ের শেষে Stream
এবং StreamExt
ট্রেইটগুলি সম্পর্কে আরও কিছু বিস্তারিত ব্যাখ্যা করব, কিন্তু আপাতত আপনার যা জানা দরকার তা হলো Stream
ট্রেইট একটি নিম্ন-স্তরের ইন্টারফেস সংজ্ঞায়িত করে যা কার্যকরভাবে Iterator
এবং Future
ট্রেইটগুলিকে একত্রিত করে। StreamExt
Stream
-এর উপরে একটি উচ্চ-স্তরের API সেট সরবরাহ করে, যার মধ্যে next
মেথড এবং Iterator
ট্রেইট দ্বারা প্রদত্ত অন্যান্য ইউটিলিটি মেথডগুলির মতো অন্যান্য মেথডও রয়েছে। Stream
এবং StreamExt
এখনও রাস্টের স্ট্যান্ডার্ড লাইব্রেরির অংশ নয়, তবে বেশিরভাগ ইকোসিস্টেম ক্রেট একই সংজ্ঞা ব্যবহার করে।
কম্পাইলার ত্রুটির সমাধান হলো trpl::StreamExt
-এর জন্য একটি use
স্টেটমেন্ট যোগ করা, যেমনটি লিস্টিং 17-31-এ দেখানো হয়েছে।
extern crate trpl; // required for mdbook test use trpl::StreamExt; fn main() { trpl::run(async { let values = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]; let iter = values.iter().map(|n| n * 2); let mut stream = trpl::stream_from_iter(iter); while let Some(value) = stream.next().await { println!("The value was: {value}"); } }); }
এই সমস্ত অংশগুলি একসাথে রাখলে, এই কোডটি আমাদের ইচ্ছামতো কাজ করে! আরও কী, এখন যেহেতু আমাদের স্কোপে StreamExt
আছে, আমরা এর সমস্ত ইউটিলিটি মেথড ব্যবহার করতে পারি, ঠিক ইটারেটরের মতোই। উদাহরণস্বরূপ, লিস্টিং 17-32-এ, আমরা তিন এবং পাঁচের গুণিতক ছাড়া বাকি সবকিছু ফিল্টার করতে filter
মেথড ব্যবহার করি।
extern crate trpl; // required for mdbook test use trpl::StreamExt; fn main() { trpl::run(async { let values = 1..101; let iter = values.map(|n| n * 2); let stream = trpl::stream_from_iter(iter); let mut filtered = stream.filter(|value| value % 3 == 0 || value % 5 == 0); while let Some(value) = filtered.next().await { println!("The value was: {value}"); } }); }
অবশ্যই, এটি খুব আকর্ষণীয় নয়, যেহেতু আমরা সাধারণ ইটারেটর দিয়ে এবং কোনো async ছাড়াই একই কাজ করতে পারতাম। আসুন দেখি স্ট্রীমের জন্য অনন্য কী করা যায়।
স্ট্রীম কম্পোজ করা (Composing Streams)
অনেক ধারণা স্বাভাবিকভাবেই স্ট্রীম হিসাবে উপস্থাপিত হয়: একটি কিউ (queue)-তে আইটেম উপলব্ধ হওয়া, ফাইলসিস্টেম থেকে ডেটার খণ্ডাংশ ধীরে ধীরে আনা যখন পুরো ডেটা সেট কম্পিউটারের মেমরির জন্য খুব বড় হয়, অথবা সময়ের সাথে সাথে নেটওয়ার্কের মাধ্যমে ডেটা আসা। যেহেতু স্ট্রীমগুলি ফিউচার, আমরা সেগুলিকে অন্য যেকোনো ধরনের ফিউচারের সাথে ব্যবহার করতে পারি এবং সেগুলিকে আকর্ষণীয় উপায়ে একত্রিত করতে পারি। উদাহরণস্বরূপ, আমরা খুব বেশি নেটওয়ার্ক কল ট্রিগার করা এড়াতে ইভেন্টগুলিকে ব্যাচ করতে পারি, দীর্ঘ সময় ধরে চলা অপারেশনগুলির সিকোয়েন্সে টাইমআউট সেট করতে পারি, অথবা অপ্রয়োজনীয় কাজ করা এড়াতে ইউজার ইন্টারফেস ইভেন্টগুলিকে থ্রটল করতে পারি।
আসুন আমরা একটি ছোট বার্তা স্ট্রীম তৈরি করে শুরু করি যা ওয়েবসকেট বা অন্য কোনো রিয়েল-টাইম কমিউনিকেশন প্রোটোকল থেকে আমরা যে ডেটা স্ট্রীম দেখতে পারি তার একটি বিকল্প হিসাবে কাজ করবে, যেমনটি লিস্টিং 17-33-এ দেখানো হয়েছে।
extern crate trpl; // required for mdbook test use trpl::{ReceiverStream, Stream, StreamExt}; fn main() { trpl::run(async { let mut messages = get_messages(); while let Some(message) = messages.next().await { println!("{message}"); } }); } fn get_messages() -> impl Stream<Item = String> { let (tx, rx) = trpl::channel(); let messages = ["a", "b", "c", "d", "e", "f", "g", "h", "i", "j"]; for message in messages { tx.send(format!("Message: '{message}'")).unwrap(); } ReceiverStream::new(rx) }
প্রথমে, আমরা get_messages
নামে একটি ফাংশন তৈরি করি যা impl Stream<Item = String>
রিটার্ন করে। এর ইমপ্লিমেন্টেশনের জন্য, আমরা একটি async চ্যানেল তৈরি করি, ইংরেজি বর্ণমালার প্রথম ১০টি অক্ষরের উপর লুপ করি, এবং সেগুলি চ্যানেলের মাধ্যমে পাঠাই।
আমরা একটি নতুন টাইপও ব্যবহার করি: ReceiverStream
, যা trpl::channel
থেকে rx
রিসিভারকে next
মেথড সহ একটি Stream
-এ রূপান্তরিত করে। main
-এ ফিরে, আমরা স্ট্রীম থেকে সমস্ত বার্তা প্রিন্ট করতে একটি while let
লুপ ব্যবহার করি।
যখন আমরা এই কোডটি চালাই, আমরা ঠিক সেই ফলাফল পাই যা আমরা আশা করেছিলাম:
Message: 'a'
Message: 'b'
Message: 'c'
Message: 'd'
Message: 'e'
Message: 'f'
Message: 'g'
Message: 'h'
Message: 'i'
Message: 'j'
আবারও, আমরা এটি নিয়মিত Receiver
API বা এমনকি নিয়মিত Iterator
API দিয়েও করতে পারতাম, তাই আসুন এমন একটি ফিচার যোগ করি যার জন্য স্ট্রীম প্রয়োজন: স্ট্রীমের প্রতিটি আইটেমের জন্য একটি টাইমআউট যোগ করা, এবং আমরা যে আইটেমগুলি নির্গত করি সেগুলিতে একটি বিলম্ব যোগ করা, যেমনটি লিস্টিং 17-34-এ দেখানো হয়েছে।
extern crate trpl; // required for mdbook test use std::{pin::pin, time::Duration}; use trpl::{ReceiverStream, Stream, StreamExt}; fn main() { trpl::run(async { let mut messages = pin!(get_messages().timeout(Duration::from_millis(200))); while let Some(result) = messages.next().await { match result { Ok(message) => println!("{message}"), Err(reason) => eprintln!("Problem: {reason:?}"), } } }) } fn get_messages() -> impl Stream<Item = String> { let (tx, rx) = trpl::channel(); let messages = ["a", "b", "c", "d", "e", "f", "g", "h", "i", "j"]; for message in messages { tx.send(format!("Message: '{message}'")).unwrap(); } ReceiverStream::new(rx) }
আমরা timeout
মেথড দিয়ে স্ট্রীমে একটি টাইমআউট যোগ করে শুরু করি, যা StreamExt
ট্রেইট থেকে আসে। তারপর আমরা while let
লুপের বডি আপডেট করি, কারণ স্ট্রীমটি এখন একটি Result
রিটার্ন করে। Ok
ভ্যারিয়েন্টটি নির্দেশ করে যে একটি বার্তা সময়মতো এসেছে; Err
ভ্যারিয়েন্টটি নির্দেশ করে যে কোনো বার্তা আসার আগে টাইমআউট শেষ হয়ে গেছে। আমরা সেই ফলাফলের উপর match
করি এবং হয় সফলভাবে বার্তা পেলে সেটি প্রিন্ট করি অথবা টাইমআউট সম্পর্কে একটি বিজ্ঞপ্তি প্রিন্ট করি। অবশেষে, লক্ষ্য করুন যে আমরা টাইমআউট প্রয়োগ করার পরে বার্তাগুলি পিন করি, কারণ টাইমআউট হেল্পার একটি স্ট্রীম তৈরি করে যা পোল করার জন্য পিন করা প্রয়োজন।
তবে, বার্তাগুলির মধ্যে কোনো বিলম্ব না থাকায়, এই টাইমআউটটি প্রোগ্রামের আচরণ পরিবর্তন করে না। আসুন আমরা যে বার্তাগুলি পাঠাই সেগুলিতে একটি পরিবর্তনশীল বিলম্ব যোগ করি, যেমনটি লিস্টিং 17-35-এ দেখানো হয়েছে।
extern crate trpl; // required for mdbook test use std::{pin::pin, time::Duration}; use trpl::{ReceiverStream, Stream, StreamExt}; fn main() { trpl::run(async { let mut messages = pin!(get_messages().timeout(Duration::from_millis(200))); while let Some(result) = messages.next().await { match result { Ok(message) => println!("{message}"), Err(reason) => eprintln!("Problem: {reason:?}"), } } }) } fn get_messages() -> impl Stream<Item = String> { let (tx, rx) = trpl::channel(); trpl::spawn_task(async move { let messages = ["a", "b", "c", "d", "e", "f", "g", "h", "i", "j"]; for (index, message) in messages.into_iter().enumerate() { let time_to_sleep = if index % 2 == 0 { 100 } else { 300 }; trpl::sleep(Duration::from_millis(time_to_sleep)).await; tx.send(format!("Message: '{message}'")).unwrap(); } }); ReceiverStream::new(rx) }
get_messages
-এ, আমরা messages
অ্যারের সাথে enumerate
ইটারেটর মেথড ব্যবহার করি যাতে আমরা আইটেমটির সাথে আমরা যে প্রতিটি আইটেম পাঠাচ্ছি তার ইনডেক্স পেতে পারি। তারপর আমরা জোড়-ইনডেক্স আইটেমগুলিতে ১০০-মিলিসেকেন্ড বিলম্ব এবং বিজোড়-ইনডেক্স আইটেমগুলিতে ৩০০-মিলিসেকেন্ড বিলম্ব প্রয়োগ করি যাতে বাস্তব জগতে আমরা একটি বার্তা স্ট্রীম থেকে যে বিভিন্ন বিলম্ব দেখতে পারি তা অনুকরণ করা যায়। যেহেতু আমাদের টাইমআউট ২০০ মিলিসেকেন্ডের জন্য, এটি অর্ধেক বার্তাগুলিকে প্রভাবিত করা উচিত।
get_messages
ফাংশনে বার্তাগুলির মধ্যে স্লিপ করার জন্য ব্লকিং ছাড়াই, আমাদের async ব্যবহার করতে হবে। যাইহোক, আমরা get_messages
নিজেই একটি async ফাংশন করতে পারি না, কারণ তাহলে আমরা Stream<Item = String>>
-এর পরিবর্তে একটি Future<Output = Stream<Item = String>>
রিটার্ন করতাম। কলারকে স্ট্রীমে অ্যাক্সেস পেতে get_messages
নিজেই await করতে হতো। কিন্তু মনে রাখবেন: একটি নির্দিষ্ট ফিউচারের মধ্যে সবকিছু রৈখিকভাবে ঘটে; কনকারেন্সি ফিউচারগুলির মধ্যে ঘটে। get_messages
await করার জন্য রিসিভার স্ট্রীম রিটার্ন করার আগে প্রতিটি বার্তার মধ্যে স্লিপ বিলম্ব সহ সমস্ত বার্তা পাঠাতে হতো। ফলস্বরূপ, টাইমআউটটি অকেজো হয়ে যেত। স্ট্রীমে নিজেই কোনো বিলম্ব থাকত না; স্ট্রীমটি উপলব্ধ হওয়ার আগেই সেগুলি ঘটত।
পরিবর্তে, আমরা get_messages
-কে একটি নিয়মিত ফাংশন হিসাবে রেখে দিই যা একটি স্ট্রীম রিটার্ন করে, এবং আমরা async sleep
কলগুলি পরিচালনা করার জন্য একটি টাস্ক তৈরি করি।
দ্রষ্টব্য: এইভাবে
spawn_task
কল করা কাজ করে কারণ আমরা ইতিমধ্যে আমাদের রানটাইম সেট আপ করেছি; যদি আমরা না করতাম, এটি একটি প্যানিক (panic) সৃষ্টি করত। অন্যান্য ইমপ্লিমেন্টেশনগুলি বিভিন্ন ট্রেড-অফ বেছে নেয়: তারা একটি নতুন রানটাইম তৈরি করতে পারে এবং প্যানিক এড়াতে পারে কিন্তু সামান্য অতিরিক্ত ওভারহেডের সাথে শেষ হতে পারে, অথবা তারা রানটাইমের রেফারেন্স ছাড়াই টাস্ক তৈরি করার জন্য একটি স্বতন্ত্র উপায় সরবরাহ নাও করতে পারে। নিশ্চিত করুন যে আপনি জানেন আপনার রানটাইম কোন ট্রেড-অফ বেছে নিয়েছে এবং সেই অনুযায়ী আপনার কোড লিখুন!
এখন আমাদের কোডের একটি অনেক বেশি আকর্ষণীয় ফলাফল রয়েছে। প্রতি দুটি বার্তার মধ্যে, একটি Problem: Elapsed(())
ত্রুটি দেখা যায়।
Message: 'a'
Problem: Elapsed(())
Message: 'b'
Message: 'c'
Problem: Elapsed(())
Message: 'd'
Message: 'e'
Problem: Elapsed(())
Message: 'f'
Message: 'g'
Problem: Elapsed(())
Message: 'h'
Message: 'i'
Problem: Elapsed(())
Message: 'j'
টাইমআউটটি শেষ পর্যন্ত বার্তাগুলিকে আসতে বাধা দেয় না। আমরা এখনও সমস্ত মূল বার্তা পাই, কারণ আমাদের চ্যানেলটি আনবাউন্ডেড (unbounded): এটি মেমরিতে যতগুলি বার্তা ফিট করতে পারে ততগুলি ধরে রাখতে পারে। যদি বার্তাটি টাইমআউটের আগে না আসে, আমাদের স্ট্রীম হ্যান্ডলার সেটির হিসাব রাখবে, কিন্তু যখন এটি আবার স্ট্রীমটি পোল করবে, তখন বার্তাটি এখন এসে থাকতে পারে।
প্রয়োজনে আপনি অন্যান্য ধরনের চ্যানেল বা সাধারণভাবে অন্যান্য ধরনের স্ট্রীম ব্যবহার করে ভিন্ন আচরণ পেতে পারেন। আসুন সেগুলির মধ্যে একটি বাস্তবে দেখি সময় ব্যবধানের একটি স্ট্রীমকে এই বার্তাগুলির স্ট্রীমের সাথে একত্রিত করে।
স্ট্রীম মার্জ করা (Merging Streams)
প্রথমে, আসুন আরেকটি স্ট্রীম তৈরি করি, যা সরাসরি চালালে প্রতি মিলিসেকেন্ডে একটি আইটেম নির্গত করবে। সরলতার জন্য, আমরা একটি বিলম্ব সহ একটি বার্তা পাঠাতে sleep
ফাংশন ব্যবহার করতে পারি এবং এটিকে get_messages
-এ ব্যবহৃত একই পদ্ধতির সাথে একত্রিত করতে পারি যেখানে একটি চ্যানেল থেকে একটি স্ট্রীম তৈরি করা হয়েছিল। পার্থক্য হলো এইবার, আমরা অতিবাহিত ব্যবধানের সংখ্যা ফেরত পাঠাতে যাচ্ছি, তাই রিটার্ন টাইপ হবে impl Stream<Item = u32>
, এবং আমরা ফাংশনটিকে get_intervals
বলতে পারি (দেখুন লিস্টিং 17-36)।
extern crate trpl; // required for mdbook test use std::{pin::pin, time::Duration}; use trpl::{ReceiverStream, Stream, StreamExt}; fn main() { trpl::run(async { let mut messages = pin!(get_messages().timeout(Duration::from_millis(200))); while let Some(result) = messages.next().await { match result { Ok(message) => println!("{message}"), Err(reason) => eprintln!("Problem: {reason:?}"), } } }) } fn get_messages() -> impl Stream<Item = String> { let (tx, rx) = trpl::channel(); trpl::spawn_task(async move { let messages = ["a", "b", "c", "d", "e", "f", "g", "h", "i", "j"]; for (index, message) in messages.into_iter().enumerate() { let time_to_sleep = if index % 2 == 0 { 100 } else { 300 }; trpl::sleep(Duration::from_millis(time_to_sleep)).await; tx.send(format!("Message: '{message}'")).unwrap(); } }); ReceiverStream::new(rx) } fn get_intervals() -> impl Stream<Item = u32> { let (tx, rx) = trpl::channel(); trpl::spawn_task(async move { let mut count = 0; loop { trpl::sleep(Duration::from_millis(1)).await; count += 1; tx.send(count).unwrap(); } }); ReceiverStream::new(rx) }
আমরা টাস্কের মধ্যে একটি count
সংজ্ঞায়িত করে শুরু করি। (আমরা এটিকে টাস্কের বাইরেও সংজ্ঞায়িত করতে পারতাম, তবে যেকোনো প্রদত্ত ভেরিয়েবলের স্কোপ সীমিত রাখা পরিষ্কার।) তারপর আমরা একটি অসীম লুপ তৈরি করি। লুপের প্রতিটি ইটারেশন অ্যাসিঙ্ক্রোনাসভাবে এক মিলিসেকেন্ড ঘুমায়, কাউন্ট বৃদ্ধি করে, এবং তারপর এটি চ্যানেলের মাধ্যমে পাঠায়। যেহেতু এই সবকিছুই spawn_task
দ্বারা তৈরি করা টাস্কের মধ্যে মোড়ানো, তাই এর সবকিছু—অসীম লুপ সহ—রানটাইমের সাথে পরিষ্কার হয়ে যাবে।
এই ধরনের অসীম লুপ, যা কেবল তখনই শেষ হয় যখন পুরো রানটাইমটি ভেঙে যায়, async রাস্টে বেশ সাধারণ: অনেক প্রোগ্রামের অনির্দিষ্টকালের জন্য চলতে থাকতে হয়। async-এর সাথে, এটি অন্য কিছু ব্লক করে না, যতক্ষণ না লুপের প্রতিটি ইটারেশনে অন্তত একটি await পয়েন্ট থাকে।
এখন, আমাদের মূল ফাংশনের async ব্লকে ফিরে, আমরা messages
এবং intervals
স্ট্রীমগুলিকে মার্জ করার চেষ্টা করতে পারি, যেমনটি লিস্টিং 17-37-এ দেখানো হয়েছে।
extern crate trpl; // required for mdbook test
use std::{pin::pin, time::Duration};
use trpl::{ReceiverStream, Stream, StreamExt};
fn main() {
trpl::run(async {
let messages = get_messages().timeout(Duration::from_millis(200));
let intervals = get_intervals();
let merged = messages.merge(intervals);
while let Some(result) = merged.next().await {
match result {
Ok(message) => println!("{message}"),
Err(reason) => eprintln!("Problem: {reason:?}"),
}
}
})
}
fn get_messages() -> impl Stream<Item = String> {
let (tx, rx) = trpl::channel();
trpl::spawn_task(async move {
let messages = ["a", "b", "c", "d", "e", "f", "g", "h", "i", "j"];
for (index, message) in messages.into_iter().enumerate() {
let time_to_sleep = if index % 2 == 0 { 100 } else { 300 };
trpl::sleep(Duration::from_millis(time_to_sleep)).await;
tx.send(format!("Message: '{message}'")).unwrap();
}
});
ReceiverStream::new(rx)
}
fn get_intervals() -> impl Stream<Item = u32> {
let (tx, rx) = trpl::channel();
trpl::spawn_task(async move {
let mut count = 0;
loop {
trpl::sleep(Duration::from_millis(1)).await;
count += 1;
tx.send(count).unwrap();
}
});
ReceiverStream::new(rx)
}
আমরা get_intervals
কল করে শুরু করি। তারপর আমরা messages
এবং intervals
স্ট্রীমগুলিকে merge
মেথড দিয়ে মার্জ করি, যা একাধিক স্ট্রীমকে একটি স্ট্রীমে একত্রিত করে যা যেকোনো সোর্স স্ট্রীম থেকে আইটেমগুলি উপলব্ধ হওয়ার সাথে সাথেই তৈরি করে, কোনো নির্দিষ্ট ক্রম আরোপ না করে। অবশেষে, আমরা messages
-এর পরিবর্তে সেই একত্রিত স্ট্রীমের উপর লুপ করি।
এই মুহুর্তে, messages
বা intervals
কোনোটিকেই পিন বা মিউটেবল (mutable) করার প্রয়োজন নেই, কারণ উভয়ই একক merged
স্ট্রীমে একত্রিত হবে। যাইহোক, merge
-এর এই কলটি কম্পাইল হয় না! (while let
লুপে next
কলটিও হয় না, তবে আমরা সেটিতে পরে ফিরে আসব।) এর কারণ হলো দুটি স্ট্রীমের বিভিন্ন টাইপ রয়েছে। messages
স্ট্রীমটির টাইপ Timeout<impl Stream<Item = String>>
, যেখানে Timeout
হলো সেই টাইপ যা একটি timeout
কলের জন্য Stream
ইমপ্লিমেন্ট করে। intervals
স্ট্রীমটির টাইপ impl Stream<Item = u32>
। এই দুটি স্ট্রীমকে মার্জ করতে, আমাদের একটিকে অন্যটির সাথে মেলানোর জন্য রূপান্তর করতে হবে। আমরা intervals
স্ট্রীমটি পুনরায় কাজ করব, কারণ messages
ইতিমধ্যে আমাদের কাঙ্ক্ষিত মৌলিক বিন্যাসে রয়েছে এবং টাইমআউট ত্রুটিগুলি পরিচালনা করতে হবে (দেখুন লিস্টিং 17-38)।
extern crate trpl; // required for mdbook test
use std::{pin::pin, time::Duration};
use trpl::{ReceiverStream, Stream, StreamExt};
fn main() {
trpl::run(async {
let messages = get_messages().timeout(Duration::from_millis(200));
let intervals = get_intervals()
.map(|count| format!("Interval: {count}"))
.timeout(Duration::from_secs(10));
let merged = messages.merge(intervals);
let mut stream = pin!(merged);
while let Some(result) = stream.next().await {
match result {
Ok(message) => println!("{message}"),
Err(reason) => eprintln!("Problem: {reason:?}"),
}
}
})
}
fn get_messages() -> impl Stream<Item = String> {
let (tx, rx) = trpl::channel();
trpl::spawn_task(async move {
let messages = ["a", "b", "c", "d", "e", "f", "g", "h", "i", "j"];
for (index, message) in messages.into_iter().enumerate() {
let time_to_sleep = if index % 2 == 0 { 100 } else { 300 };
trpl::sleep(Duration::from_millis(time_to_sleep)).await;
tx.send(format!("Message: '{message}'")).unwrap();
}
});
ReceiverStream::new(rx)
}
fn get_intervals() -> impl Stream<Item = u32> {
let (tx, rx) = trpl::channel();
trpl::spawn_task(async move {
let mut count = 0;
loop {
trpl::sleep(Duration::from_millis(1)).await;
count += 1;
tx.send(count).unwrap();
}
});
ReceiverStream::new(rx)
}
প্রথমত, আমরা intervals
-কে একটি স্ট্রিং-এ রূপান্তর করতে map
হেল্পার মেথড ব্যবহার করতে পারি। দ্বিতীয়ত, আমাদের messages
থেকে Timeout
-এর সাথে মেলাতে হবে। যেহেতু আমরা আসলে intervals
-এর জন্য কোনো টাইমআউট চাই না, তাই আমরা কেবল একটি টাইমআউট তৈরি করতে পারি যা আমাদের ব্যবহার করা অন্যান্য সময়কালের চেয়ে দীর্ঘ। এখানে, আমরা Duration::from_secs(10)
দিয়ে একটি ১০-সেকেন্ডের টাইমআউট তৈরি করি। অবশেষে, আমাদের stream
-কে মিউটেবল করতে হবে, যাতে while let
লুপের next
কলগুলি স্ট্রীমের মধ্য দিয়ে ইটারেট করতে পারে, এবং এটিকে পিন করতে হবে যাতে এটি করা নিরাপদ হয়। এটি আমাদের প্রায় যেখানে পৌঁছানো দরকার সেখানে নিয়ে যায়। সবকিছু টাইপ চেক করে। তবে আপনি যদি এটি চালান, দুটি সমস্যা হবে। প্রথমত, এটি কখনই থামবে না! আপনাকে ctrl-c দিয়ে এটি থামাতে হবে। দ্বিতীয়ত, ইংরেজি বর্ণমালার বার্তাগুলি সমস্ত ব্যবধান কাউন্টার বার্তাগুলির মধ্যে চাপা পড়ে যাবে:
--snip--
Interval: 38
Interval: 39
Interval: 40
Message: 'a'
Interval: 41
Interval: 42
Interval: 43
--snip--
লিস্টিং 17-39 এই শেষ দুটি সমস্যার সমাধান করার একটি উপায় দেখায়।
extern crate trpl; // required for mdbook test use std::{pin::pin, time::Duration}; use trpl::{ReceiverStream, Stream, StreamExt}; fn main() { trpl::run(async { let messages = get_messages().timeout(Duration::from_millis(200)); let intervals = get_intervals() .map(|count| format!("Interval: {count}")) .throttle(Duration::from_millis(100)) .timeout(Duration::from_secs(10)); let merged = messages.merge(intervals).take(20); let mut stream = pin!(merged); while let Some(result) = stream.next().await { match result { Ok(message) => println!("{message}"), Err(reason) => eprintln!("Problem: {reason:?}"), } } }) } fn get_messages() -> impl Stream<Item = String> { let (tx, rx) = trpl::channel(); trpl::spawn_task(async move { let messages = ["a", "b", "c", "d", "e", "f", "g", "h", "i", "j"]; for (index, message) in messages.into_iter().enumerate() { let time_to_sleep = if index % 2 == 0 { 100 } else { 300 }; trpl::sleep(Duration::from_millis(time_to_sleep)).await; tx.send(format!("Message: '{message}'")).unwrap(); } }); ReceiverStream::new(rx) } fn get_intervals() -> impl Stream<Item = u32> { let (tx, rx) = trpl::channel(); trpl::spawn_task(async move { let mut count = 0; loop { trpl::sleep(Duration::from_millis(1)).await; count += 1; tx.send(count).unwrap(); } }); ReceiverStream::new(rx) }
প্রথমে, আমরা intervals
স্ট্রীমে throttle
মেথড ব্যবহার করি যাতে এটি messages
স্ট্রীমকে অভিভূত না করে। থ্রটলিং (throttling) হলো একটি ফাংশন কত ঘন ঘন কল করা হবে—অথবা, এই ক্ষেত্রে, স্ট্রীমটি কত ঘন ঘন পোল করা হবে—তা সীমিত করার একটি উপায়। প্রতি ১০০ মিলিসেকেন্ডে একবার করলেই হবে, কারণ আমাদের বার্তাগুলি প্রায় তত ঘন ঘনই আসে।
আমরা একটি স্ট্রীম থেকে কতগুলি আইটেম গ্রহণ করব তা সীমিত করতে, আমরা merged
স্ট্রীমে take
মেথড প্রয়োগ করি, কারণ আমরা কেবল একটি স্ট্রীম বা অন্যটিকে নয়, চূড়ান্ত আউটপুট সীমিত করতে চাই।
এখন যখন আমরা প্রোগ্রামটি চালাই, এটি স্ট্রীম থেকে ২০টি আইটেম নেওয়ার পরে থেমে যায়, এবং ব্যবধানগুলি বার্তাগুলিকে অভিভূত করে না। আমরা Interval: 100
বা Interval: 200
ইত্যাদিও পাই না, বরং Interval: 1
, Interval: 2
, ইত্যাদি পাই—যদিও আমাদের কাছে একটি সোর্স স্ট্রীম আছে যা প্রতি মিলিসেকেন্ডে একটি ইভেন্ট তৈরি করতে পারে। এর কারণ হলো throttle
কল একটি নতুন স্ট্রীম তৈরি করে যা মূল স্ট্রীমটিকে র্যাপ করে যাতে মূল স্ট্রীমটি কেবল থ্রটল হারে পোল করা হয়, তার নিজস্ব "নেটিভ" হারে নয়। আমাদের কাছে একগুচ্ছ অব্যবহৃত ব্যবধান বার্তা নেই যা আমরা উপেক্ষা করার সিদ্ধান্ত নিচ্ছি। পরিবর্তে, আমরা প্রথম স্থানে সেই ব্যবধান বার্তাগুলি তৈরিই করি না! এটি রাস্টের ফিউচারের অন্তর্নিহিত "অলসতা" আবার কাজে লাগছে, যা আমাদের পারফরম্যান্স বৈশিষ্ট্যগুলি বেছে নেওয়ার সুযোগ দেয়।
Interval: 1
Message: 'a'
Interval: 2
Interval: 3
Problem: Elapsed(())
Interval: 4
Message: 'b'
Interval: 5
Message: 'c'
Interval: 6
Interval: 7
Problem: Elapsed(())
Interval: 8
Message: 'd'
Interval: 9
Message: 'e'
Interval: 10
Interval: 11
Problem: Elapsed(())
Interval: 12
আমাদের শেষ একটি জিনিস সামলাতে হবে: ত্রুটি! এই দুটি চ্যানেল-ভিত্তিক স্ট্রীমের সাথে, চ্যানেলের অন্য দিকটি বন্ধ হয়ে গেলে send
কলগুলি ব্যর্থ হতে পারে—এবং এটি কেবল রানটাইম কীভাবে স্ট্রীম গঠনকারী ফিউচারগুলি চালায় তার উপর নির্ভর করে। এখন পর্যন্ত, আমরা unwrap
কল করে এই সম্ভাবনাটি উপেক্ষা করেছি, কিন্তু একটি সুশৃঙ্খল অ্যাপে, আমাদের স্পষ্টভাবে ত্রুটিটি পরিচালনা করা উচিত, ন্যূনতমভাবে লুপটি শেষ করে যাতে আমরা আর কোনো বার্তা পাঠানোর চেষ্টা না করি। লিস্টিং 17-40 একটি সাধারণ ত্রুটি কৌশল দেখায়: সমস্যাটি প্রিন্ট করা এবং তারপর লুপগুলি থেকে break
করা।
extern crate trpl; // required for mdbook test use std::{pin::pin, time::Duration}; use trpl::{ReceiverStream, Stream, StreamExt}; fn main() { trpl::run(async { let messages = get_messages().timeout(Duration::from_millis(200)); let intervals = get_intervals() .map(|count| format!("Interval #{count}")) .throttle(Duration::from_millis(500)) .timeout(Duration::from_secs(10)); let merged = messages.merge(intervals).take(20); let mut stream = pin!(merged); while let Some(result) = stream.next().await { match result { Ok(item) => println!("{item}"), Err(reason) => eprintln!("Problem: {reason:?}"), } } }); } fn get_messages() -> impl Stream<Item = String> { let (tx, rx) = trpl::channel(); trpl::spawn_task(async move { let messages = ["a", "b", "c", "d", "e", "f", "g", "h", "i", "j"]; for (index, message) in messages.into_iter().enumerate() { let time_to_sleep = if index % 2 == 0 { 100 } else { 300 }; trpl::sleep(Duration::from_millis(time_to_sleep)).await; if let Err(send_error) = tx.send(format!("Message: '{message}'")) { eprintln!("Cannot send message '{message}': {send_error}"); break; } } }); ReceiverStream::new(rx) } fn get_intervals() -> impl Stream<Item = u32> { let (tx, rx) = trpl::channel(); trpl::spawn_task(async move { let mut count = 0; loop { trpl::sleep(Duration::from_millis(1)).await; count += 1; if let Err(send_error) = tx.send(count) { eprintln!("Could not send interval {count}: {send_error}"); break; }; } }); ReceiverStream::new(rx) }``` </Listing> যথারীতি, একটি বার্তা পাঠানোর ত্রুটি পরিচালনা করার সঠিক উপায় পরিবর্তিত হবে; শুধু নিশ্চিত করুন যে আপনার একটি কৌশল আছে। এখন যেহেতু আমরা বাস্তবে অনেক async দেখেছি, আসুন এক ধাপ পিছিয়ে গিয়ে `Future`, `Stream`, এবং রাস্ট async কাজ করানোর জন্য যে অন্যান্য মূল ট্রেইটগুলি ব্যবহার করে সেগুলির কিছু বিশদ বিবরণে যাই। [17-02-messages]: ch17-02-concurrency-with-async.html#message-passing [iterator-trait]: ch13-02-iterators.html#the-iterator-trait-and-the-next-method