স্ট্রিমস: সিকোয়েন্সে ফিউচার (Streams: Futures in Sequence)
এই চ্যাপ্টারে ఇప్పటి পর্যন্ত, আমরা বেশিরভাগ ক্ষেত্রে individual ফিউচারের মধ্যেই আটকে ছিলাম। একটি বড় ব্যতিক্রম ছিল অ্যাসিঙ্ক্রোনাস চ্যানেল যা আমরা ব্যবহার করেছি। এই চ্যাপ্টারের “Message Passing” বিভাগে আমরা কীভাবে আমাদের অ্যাসিঙ্ক্রোনাস চ্যানেলের জন্য রিসিভার ব্যবহার করেছি তা স্মরণ করুন। অ্যাসিঙ্ক্রোনাস recv
মেথড সময়ের সাথে আইটেমগুলির একটি সিকোয়েন্স তৈরি করে। এটি স্ট্রিম নামে পরিচিত আরও অনেক সাধারণ প্যাটার্নের একটি উদাহরণ।
আমরা Chapter 13-এর The Iterator Trait and the next
Method বিভাগে Iterator
trait দেখার সময় আইটেমগুলির একটি সিকোয়েন্স দেখেছিলাম, কিন্তু ইটারেটর এবং অ্যাসিঙ্ক্রোনাস চ্যানেল রিসিভারের মধ্যে দুটি পার্থক্য রয়েছে। প্রথম পার্থক্য হল সময়: ইটারেটরগুলি সিঙ্ক্রোনাস, যেখানে চ্যানেল রিসিভার অ্যাসিঙ্ক্রোনাস। দ্বিতীয়টি হল API। Iterator
-এর সাথে সরাসরি কাজ করার সময়, আমরা এর সিঙ্ক্রোনাস next
মেথড কল করি। বিশেষ করে trpl::Receiver
স্ট্রিমের সাথে, আমরা পরিবর্তে একটি অ্যাসিঙ্ক্রোনাস recv
মেথড কল করেছি। অন্যথায়, এই API গুলি খুব একই রকম মনে হয় এবং সেই মিলটি কোনও কাকতালীয় ঘটনা নয়। একটি স্ট্রিম হল ইটারেশনের একটি অ্যাসিঙ্ক্রোনাস ফর্মের মতো। যেখানে trpl::Receiver
বিশেষভাবে মেসেজ পাওয়ার জন্য অপেক্ষা করে, যদিও, সাধারণ-উদ্দেশ্যের স্ট্রিম API অনেক বিস্তৃত: এটি Iterator
-এর মতোই পরবর্তী আইটেম সরবরাহ করে, কিন্তু অ্যাসিঙ্ক্রোনাসভাবে।
Rust-এ ইটারেটর এবং স্ট্রিমের মধ্যে মিলের অর্থ হল আমরা আসলে যেকোনো ইটারেটর থেকে একটি স্ট্রিম তৈরি করতে পারি। একটি ইটারেটরের মতো, আমরা একটি স্ট্রিমের next
মেথড কল করে এবং তারপর আউটপুটের জন্য অপেক্ষা করে একটি স্ট্রিমের সাথে কাজ করতে পারি, যেমনটি Listing 17-30-এ রয়েছে।
extern crate trpl; // required for mdbook test
fn main() {
trpl::run(async {
let values = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
let iter = values.iter().map(|n| n * 2);
let mut stream = trpl::stream_from_iter(iter);
while let Some(value) = stream.next().await {
println!("The value was: {value}");
}
});
}
আমরা সংখ্যার একটি অ্যারে দিয়ে শুরু করি, যেটিকে আমরা একটি ইটারেটরে রূপান্তর করি এবং তারপর সমস্ত মান দ্বিগুণ করতে map
কল করি। তারপর আমরা trpl::stream_from_iter
ফাংশন ব্যবহার করে ইটারেটরটিকে একটি স্ট্রিমে রূপান্তর করি। এরপর, আমরা while let
লুপ দিয়ে স্ট্রিমে আইটেমগুলি আসার সাথে সাথে সেগুলির উপর লুপ করি।
দুর্ভাগ্যবশত, যখন আমরা কোডটি চালানোর চেষ্টা করি, তখন এটি কম্পাইল হয় না, তবে পরিবর্তে এটি রিপোর্ট করে যে কোনও next
মেথড উপলব্ধ নেই:
error[E0599]: no method named `next` found for struct `Iter` in the current scope
--> src/main.rs:10:40
|
10 | while let Some(value) = stream.next().await {
| ^^^^
|
= note: the full type name has been written to '/Users/chris/dev/rust-lang/book/main/listings/ch17-async-await/listing-17-30/target/debug/deps/async_await-575db3dd3197d257.long-type-14490787947592691573.txt'
= note: consider using `--verbose` to print the full type name to the console
= help: items from traits can only be used if the trait is in scope
help: the following traits which provide `next` are implemented but not in scope; perhaps you want to import one of them
|
1 + use crate::trpl::StreamExt;
|
1 + use futures_util::stream::stream::StreamExt;
|
1 + use std::iter::Iterator;
|
1 + use std::str::pattern::Searcher;
|
help: there is a method `try_next` with a similar name
|
10 | while let Some(value) = stream.try_next().await {
| ~~~~~~~~
এই আউটপুটটি যেমন ব্যাখ্যা করে, কম্পাইলার error-এর কারণ হল next
মেথডটি ব্যবহার করতে সক্ষম হওয়ার জন্য আমাদের স্কোপে সঠিক trait-এর প্রয়োজন। আমাদের এখন পর্যন্ত আলোচনা দেওয়া হলে, আপনি যুক্তিসঙ্গতভাবে আশা করতে পারেন যে trait টি হবে Stream
, কিন্তু এটি আসলে StreamExt
। এক্সটেনশনের জন্য সংক্ষিপ্ত, Ext
হল Rust কমিউনিটিতে একটি trait-কে অন্যটির সাথে প্রসারিত করার জন্য একটি সাধারণ প্যাটার্ন।
আমরা চ্যাপ্টারের শেষে Stream
এবং StreamExt
trait গুলিকে আরও একটু বিশদে ব্যাখ্যা করব, কিন্তু আপাতত আপনার যা জানা দরকার তা হল Stream
trait একটি নিম্ন-স্তরের ইন্টারফেস সংজ্ঞায়িত করে যা কার্যকরভাবে Iterator
এবং Future
trait গুলিকে একত্রিত করে। StreamExt
Stream
-এর উপরে API-গুলির একটি উচ্চ-স্তরের সেট সরবরাহ করে, যার মধ্যে next
মেথড এবং সেইসাথে Iterator
trait দ্বারা প্রদত্ত ইউটিলিটি মেথডগুলির অনুরূপ অন্যান্য ইউটিলিটি মেথড রয়েছে। Stream
এবং StreamExt
এখনও Rust-এর স্ট্যান্ডার্ড লাইব্রেরির অংশ নয়, তবে বেশিরভাগ ইকোসিস্টেম ক্রেট একই সংজ্ঞা ব্যবহার করে।
কম্পাইলার error-এর সমাধান হল trpl::StreamExt
-এর জন্য একটি use
স্টেটমেন্ট যুক্ত করা, যেমনটি Listing 17-31-এ রয়েছে।
extern crate trpl; // required for mdbook test use trpl::StreamExt; fn main() { trpl::run(async { let values = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]; let iter = values.iter().map(|n| n * 2); let mut stream = trpl::stream_from_iter(iter); while let Some(value) = stream.next().await { println!("The value was: {value}"); } }); }
এই সমস্ত টুকরোগুলি একসাথে রাখলে, এই কোডটি আমরা যেভাবে চাই সেভাবে কাজ করে! আরও কী, এখন যেহেতু আমাদের স্কোপে StreamExt
রয়েছে, আমরা এর সমস্ত ইউটিলিটি মেথড ব্যবহার করতে পারি, ঠিক ইটারেটরের মতোই। উদাহরণস্বরূপ, Listing 17-32-এ, আমরা তিন এবং পাঁচের গুণিতক ব্যতীত অন্য সবকিছু ফিল্টার করতে filter
মেথড ব্যবহার করি।
extern crate trpl; // required for mdbook test use trpl::StreamExt; fn main() { trpl::run(async { let values = 1..101; let iter = values.map(|n| n * 2); let stream = trpl::stream_from_iter(iter); let mut filtered = stream.filter(|value| value % 3 == 0 || value % 5 == 0); while let Some(value) = filtered.next().await { println!("The value was: {value}"); } }); }
অবশ্যই, এটি খুব আকর্ষণীয় নয়, যেহেতু আমরা সাধারণ ইটারেটর দিয়ে এবং কোনও অ্যাসিঙ্ক্রোনাস ছাড়াই একই কাজ করতে পারি। আসুন দেখি আমরা কী করতে পারি যা স্ট্রিমের জন্য অনন্য।
কম্পোজিং স্ট্রিম (Composing Streams)
অনেকগুলি ধারণা স্বাভাবিকভাবেই স্ট্রিম হিসাবে উপস্থাপিত হয়: একটি সারিতে উপলব্ধ হওয়া আইটেম, ফাইল সিস্টেম থেকে ক্রমবর্ধমানভাবে ডেটার অংশগুলি টেনে আনা যখন সম্পূর্ণ ডেটা সেট কম্পিউটারের মেমরির জন্য খুব বড় হয়, অথবা সময়ের সাথে নেটওয়ার্কের মাধ্যমে ডেটা আসা। যেহেতু স্ট্রিমগুলি ফিউচার, তাই আমরা সেগুলিকে অন্য যেকোনো ধরনের ফিউচারের সাথে ব্যবহার করতে পারি এবং সেগুলিকে আকর্ষণীয় উপায়ে একত্রিত করতে পারি। উদাহরণস্বরূপ, আমরা অনেকগুলি নেটওয়ার্ক কল ট্রিগার করা এড়াতে ইভেন্টগুলিকে ব্যাচ আপ করতে পারি, দীর্ঘ-চলমান অপারেশনগুলির সিকোয়েন্সে টাইমআউট সেট করতে পারি, অথবা অপ্রয়োজনীয় কাজ এড়াতে ইউজার ইন্টারফেস ইভেন্টগুলিকে থ্রোটল করতে পারি।
আসুন একটি ওয়েব সকেট বা অন্য রিয়েল-টাইম কমিউনিকেশন প্রোটোকল থেকে আমরা যে ডেটা স্ট্রিম দেখতে পারি তার জন্য একটি স্ট্যান্ড-ইন হিসাবে মেসেজের একটি ছোট স্ট্রিম তৈরি করে শুরু করি, যেমনটি Listing 17-33-এ দেখানো হয়েছে।
extern crate trpl; // required for mdbook test use trpl::{ReceiverStream, Stream, StreamExt}; fn main() { trpl::run(async { let mut messages = get_messages(); while let Some(message) = messages.next().await { println!("{message}"); } }); } fn get_messages() -> impl Stream<Item = String> { let (tx, rx) = trpl::channel(); let messages = ["a", "b", "c", "d", "e", "f", "g", "h", "i", "j"]; for message in messages { tx.send(format!("Message: '{message}'")).unwrap(); } ReceiverStream::new(rx) }
প্রথমে, আমরা get_messages
নামে একটি ফাংশন তৈরি করি যা impl Stream<Item = String>
রিটার্ন করে। এর ইমপ্লিমেন্টেশনের জন্য, আমরা একটি অ্যাসিঙ্ক্রোনাস চ্যানেল তৈরি করি, ইংরেজি বর্ণমালার প্রথম 10টি অক্ষরের উপর লুপ করি এবং সেগুলিকে চ্যানেলের মাধ্যমে পাঠাই।
আমরা একটি নতুন টাইপও ব্যবহার করি: ReceiverStream
, যা trpl::channel
থেকে rx
রিসিভারকে একটি next
মেথড সহ একটি Stream
-এ রূপান্তর করে। main
-এ ফিরে, আমরা স্ট্রিম থেকে সমস্ত মেসেজ প্রিন্ট করতে একটি while let
লুপ ব্যবহার করি।
যখন আমরা এই কোডটি চালাই, তখন আমরা ঠিক সেই ফলাফলগুলি পাই যা আমরা আশা করব:
Message: 'a'
Message: 'b'
Message: 'c'
Message: 'd'
Message: 'e'
Message: 'f'
Message: 'g'
Message: 'h'
Message: 'i'
Message: 'j'
আবার, আমরা এটি রেগুলার Receiver
API বা এমনকি রেগুলার Iterator
API দিয়ে করতে পারি, যদিও, তাই আসুন এমন একটি বৈশিষ্ট্য যুক্ত করি যার জন্য স্ট্রিমের প্রয়োজন: স্ট্রীমের প্রতিটি আইটেমে প্রযোজ্য একটি টাইমআউট এবং আমরা যে আইটেমগুলি নির্গত করি তাতে একটি বিলম্ব যোগ করা, যেমনটি Listing 17-34-এ দেখানো হয়েছে।
extern crate trpl; // required for mdbook test use std::{pin::pin, time::Duration}; use trpl::{ReceiverStream, Stream, StreamExt}; fn main() { trpl::run(async { let mut messages = pin!(get_messages().timeout(Duration::from_millis(200))); while let Some(result) = messages.next().await { match result { Ok(message) => println!("{message}"), Err(reason) => eprintln!("Problem: {reason:?}"), } } }) } fn get_messages() -> impl Stream<Item = String> { let (tx, rx) = trpl::channel(); let messages = ["a", "b", "c", "d", "e", "f", "g", "h", "i", "j"]; for message in messages { tx.send(format!("Message: '{message}'")).unwrap(); } ReceiverStream::new(rx) }
আমরা timeout
মেথড দিয়ে স্ট্রিমে একটি টাইমআউট যুক্ত করে শুরু করি, যা StreamExt
trait থেকে আসে। তারপর আমরা while let
লুপের বডি আপডেট করি, কারণ স্ট্রিমটি এখন একটি Result
রিটার্ন করে। Ok
ভেরিয়েন্ট নির্দেশ করে যে একটি মেসেজ সময়মতো এসেছে; Err
ভেরিয়েন্ট নির্দেশ করে যে কোনও মেসেজ আসার আগেই টাইমআউট শেষ হয়ে গেছে। আমরা সেই ফলাফলের উপর match
করি এবং হয় মেসেজটি সফলভাবে পেলে প্রিন্ট করি অথবা টাইমআউট সম্পর্কে একটি নোটিশ প্রিন্ট করি। অবশেষে, লক্ষ্য করুন যে আমরা টাইমআউট প্রয়োগ করার পরে মেসেজগুলিকে পিন করি, কারণ টাইমআউট হেল্পার একটি স্ট্রিম তৈরি করে যেটিকে পোল করার জন্য পিন করা প্রয়োজন।
যাইহোক, যেহেতু মেসেজগুলির মধ্যে কোনও বিলম্ব নেই, তাই এই টাইমআউট প্রোগ্রামের আচরণ পরিবর্তন করে না। আসুন আমরা যে মেসেজগুলি পাঠাই তাতে একটি পরিবর্তনশীল বিলম্ব যুক্ত করি, যেমনটি Listing 17-35-এ দেখানো হয়েছে।
extern crate trpl; // required for mdbook test use std::{pin::pin, time::Duration}; use trpl::{ReceiverStream, Stream, StreamExt}; fn main() { trpl::run(async { let mut messages = pin!(get_messages().timeout(Duration::from_millis(200))); while let Some(result) = messages.next().await { match result { Ok(message) => println!("{message}"), Err(reason) => eprintln!("Problem: {reason:?}"), } } }) } fn get_messages() -> impl Stream<Item = String> { let (tx, rx) = trpl::channel(); trpl::spawn_task(async move { let messages = ["a", "b", "c", "d", "e", "f", "g", "h", "i", "j"]; for (index, message) in messages.into_iter().enumerate() { let time_to_sleep = if index % 2 == 0 { 100 } else { 300 }; trpl::sleep(Duration::from_millis(time_to_sleep)).await; tx.send(format!("Message: '{message}'")).unwrap(); } }); ReceiverStream::new(rx) }
get_messages
-এ, আমরা messages
অ্যারের সাথে enumerate
ইটারেটর মেথড ব্যবহার করি যাতে আমরা যে প্রতিটি আইটেম পাঠাচ্ছি তার ইনডেক্স এবং সেইসাথে আইটেমটিও পেতে পারি। তারপর আমরা বাস্তব জগতে মেসেজের একটি স্ট্রিম থেকে আমরা যে বিভিন্ন বিলম্ব দেখতে পারি তা অনুকরণ করতে জোড়-ইনডেক্স আইটেমগুলিতে 100-মিলিসেকেন্ড বিলম্ব এবং বিজোড়-ইনডেক্স আইটেমগুলিতে 300-মিলিসেকেন্ড বিলম্ব প্রয়োগ করি। যেহেতু আমাদের টাইমআউট 200 মিলিসেকেন্ডের জন্য, তাই এটি অর্ধেক মেসেজকে প্রভাবিত করবে।
get_messages
ফাংশনে মেসেজগুলির মধ্যে স্লিপ করার জন্য ব্লক না করে, আমাদের অ্যাসিঙ্ক্রোনাস ব্যবহার করতে হবে। যাইহোক, আমরা get_messages
-কে নিজেই একটি অ্যাসিঙ্ক্রোনাস ফাংশন করতে পারি না, কারণ তাহলে আমরা একটি Stream<Item = String>>
-এর পরিবর্তে একটি Future<Output = Stream<Item = String>>
রিটার্ন করব। কলারকে স্ট্রিমটিতে অ্যাক্সেস পেতে get_messages
-এর জন্য নিজেই অপেক্ষা করতে হবে। কিন্তু মনে রাখবেন: একটি প্রদত্ত ফিউচারের মধ্যে সবকিছু লিনিয়ারভাবে ঘটে; কনকারেন্সি ঘটে ফিউচারগুলির মধ্যে। get_messages
-এর জন্য অপেক্ষা করার জন্য এটিকে সমস্ত মেসেজ পাঠাতে হবে, প্রতিটি মেসেজের মধ্যে স্লিপ বিলম্ব সহ, রিসিভার স্ট্রিম রিটার্ন করার আগে। ফলস্বরূপ, টাইমআউটটি অকেজো হবে। স্ট্রীমের মধ্যে কোনও বিলম্ব থাকবে না; সেগুলি স্ট্রিমটি উপলব্ধ হওয়ার আগেই ঘটবে।
পরিবর্তে, আমরা get_messages
-কে একটি রেগুলার ফাংশন হিসাবে ছেড়ে দিই যা একটি স্ট্রিম রিটার্ন করে এবং অ্যাসিঙ্ক্রোনাস sleep
কলগুলি পরিচালনা করার জন্য আমরা একটি টাস্ক স্পন করি।
Note: এইভাবে
spawn_task
কল করা কাজ করে কারণ আমরা ইতিমধ্যেই আমাদের রানটাইম সেট আপ করেছি; যদি আমরা তা না করতাম, তাহলে এটি একটি প্যানিকের কারণ হত। অন্যান্য ইমপ্লিমেন্টেশনগুলি বিভিন্ন ট্রেডঅফ বেছে নেয়: তারা একটি নতুন রানটাইম স্পন করতে পারে এবং প্যানিক এড়াতে পারে কিন্তু সামান্য অতিরিক্ত ওভারহেড সহ শেষ হতে পারে, অথবা তারা রানটাইমের রেফারেন্স ছাড়াই টাস্ক স্পন করার কোনও স্বতন্ত্র উপায় সরবরাহ নাও করতে পারে। আপনি নিশ্চিত করুন যে আপনার রানটাইম কোন ট্রেডঅফ বেছে নিয়েছে এবং সেই অনুযায়ী আপনার কোড লিখুন!
এখন আমাদের কোডের অনেক বেশি আকর্ষণীয় ফলাফল রয়েছে। অন্য প্রতিটি জোড়া মেসেজের মধ্যে, একটি Problem: Elapsed(())
error।
Message: 'a'
Problem: Elapsed(())
Message: 'b'
Message: 'c'
Problem: Elapsed(())
Message: 'd'
Message: 'e'
Problem: Elapsed(())
Message: 'f'
Message: 'g'
Problem: Elapsed(())
Message: 'h'
Message: 'i'
Problem: Elapsed(())
Message: 'j'
টাইমআউট শেষ পর্যন্ত মেসেজগুলিকে আসা থেকে আটকাতে পারে না। আমরা এখনও সমস্ত মূল মেসেজ পাই, কারণ আমাদের চ্যানেলটি আনবাউন্ডেড: এটি মেমরিতে যতগুলি মেসেজ ফিট করতে পারে ততগুলি ধারণ করতে পারে। যদি মেসেজটি টাইমআউটের আগে না আসে, তাহলে আমাদের স্ট্রিম হ্যান্ডলার সেটি বিবেচনা করবে, কিন্তু যখন এটি আবার স্ট্রিমটি পোল করবে, তখন মেসেজটি এসে যেতে পারে।
প্রয়োজনের ভিত্তিতে আপনি অন্যান্য ধরণের চ্যানেল বা আরও সাধারণভাবে অন্যান্য ধরণের স্ট্রিম ব্যবহার করে আলাদা আচরণ পেতে পারেন। আসুন তাদের মধ্যে একটিকে বাস্তবে দেখি, সময়ের ব্যবধানের একটি স্ট্রিমের সাথে এই মেসেজের স্ট্রিমটিকে একত্রিত করে।
স্ট্রিমগুলিকে মার্জ করা (Merging Streams)
প্রথমে, আসুন আরেকটি স্ট্রিম তৈরি করি, যেটি সরাসরি চলতে দিলে প্রতি মিলি সেকেন্ডে একটি আইটেম নির্গত করবে। সরলতার জন্য, আমরা একটি বিলম্বের সাথে একটি মেসেজ পাঠাতে sleep
ফাংশনটি ব্যবহার করতে পারি এবং এটিকে get_messages
-এ আমরা যে পদ্ধতি ব্যবহার করেছি তার সাথে একত্রিত করতে পারি একটি চ্যানেল থেকে একটি স্ট্রিম তৈরি করার জন্য। পার্থক্য হল যে এবার, আমরা যে ব্যবধানগুলি অতিক্রান্ত হয়েছে তার সংখ্যা ফেরত পাঠাব, তাই রিটার্ন টাইপ হবে impl Stream<Item = u32>
, এবং আমরা ফাংশনটিকে get_intervals
বলতে পারি (Listing 17-36 দেখুন)।
extern crate trpl; // required for mdbook test use std::{pin::pin, time::Duration}; use trpl::{ReceiverStream, Stream, StreamExt}; fn main() { trpl::run(async { let mut messages = pin!(get_messages().timeout(Duration::from_millis(200))); while let Some(result) = messages.next().await { match result { Ok(message) => println!("{message}"), Err(reason) => eprintln!("Problem: {reason:?}"), } } }) } fn get_messages() -> impl Stream<Item = String> { let (tx, rx) = trpl::channel(); trpl::spawn_task(async move { let messages = ["a", "b", "c", "d", "e", "f", "g", "h", "i", "j"]; for (index, message) in messages.into_iter().enumerate() { let time_to_sleep = if index % 2 == 0 { 100 } else { 300 }; trpl::sleep(Duration::from_millis(time_to_sleep)).await; tx.send(format!("Message: '{message}'")).unwrap(); } }); ReceiverStream::new(rx) } fn get_intervals() -> impl Stream<Item = u32> { let (tx, rx) = trpl::channel(); trpl::spawn_task(async move { let mut count = 0; loop { trpl::sleep(Duration::from_millis(1)).await; count += 1; tx.send(count).unwrap(); } }); ReceiverStream::new(rx) }
আমরা টাস্কের মধ্যে একটি count
সংজ্ঞায়িত করে শুরু করি। (আমরা এটিকে টাস্কের বাইরেও সংজ্ঞায়িত করতে পারি, তবে যেকোনো প্রদত্ত variable-এর সুযোগ সীমিত করা আরও পরিষ্কার।) তারপর আমরা একটি অসীম লুপ তৈরি করি। লুপের প্রতিটি পুনরাবৃত্তি অ্যাসিঙ্ক্রোনাসভাবে এক মিলি সেকেন্ডের জন্য স্লিপ করে, কাউন্ট বাড়ায় এবং তারপর এটিকে চ্যানেলের মাধ্যমে পাঠায়। যেহেতু এটি spawn_task
দ্বারা তৈরি টাস্কের মধ্যে র্যাপ করা হয়েছে, তাই অসীম লুপ সহ এটির সমস্ত কিছুই রানটাইমের সাথে পরিষ্কার হয়ে যাবে।
এই ধরনের অসীম লুপ, যা শুধুমাত্র তখনই শেষ হয় যখন পুরো রানটাইমটি ভেঙে যায়, অ্যাসিঙ্ক্রোনাস Rust-এ বেশ সাধারণ: অনেক প্রোগ্রামের অনির্দিষ্টকালের জন্য চলতে থাকা প্রয়োজন। অ্যাসিঙ্ক্রোনাসের সাথে, এটি অন্য কিছু ব্লক করে না, যতক্ষণ লুপের প্রতিটি পুনরাবৃত্তিতে কমপক্ষে একটি অ্যাওয়েট পয়েন্ট থাকে।
এখন, আমাদের main ফাংশনের অ্যাসিঙ্ক্রোনাস ব্লকে ফিরে, আমরা messages
এবং intervals
স্ট্রিমগুলিকে মার্জ করার চেষ্টা করতে পারি, যেমনটি Listing 17-37-এ দেখানো হয়েছে।
extern crate trpl; // required for mdbook test
use std::{pin::pin, time::Duration};
use trpl::{ReceiverStream, Stream, StreamExt};
fn main() {
trpl::run(async {
let messages = get_messages().timeout(Duration::from_millis(200));
let intervals = get_intervals();
let merged = messages.merge(intervals);
while let Some(result) = merged.next().await {
match result {
Ok(message) => println!("{message}"),
Err(reason) => eprintln!("Problem: {reason:?}"),
}
}
})
}
fn get_messages() -> impl Stream<Item = String> {
let (tx, rx) = trpl::channel();
trpl::spawn_task(async move {
let messages = ["a", "b", "c", "d", "e", "f", "g", "h", "i", "j"];
for (index, message) in messages.into_iter().enumerate() {
let time_to_sleep = if index % 2 == 0 { 100 } else { 300 };
trpl::sleep(Duration::from_millis(time_to_sleep)).await;
tx.send(format!("Message: '{message}'")).unwrap();
}
});
ReceiverStream::new(rx)
}
fn get_intervals() -> impl Stream<Item = u32> {
let (tx, rx) = trpl::channel();
trpl::spawn_task(async move {
let mut count = 0;
loop {
trpl::sleep(Duration::from_millis(1)).await;
count += 1;
tx.send(count).unwrap();
}
});
ReceiverStream::new(rx)
}
আমরা get_intervals
কল করে শুরু করি। তারপর আমরা messages
এবং intervals
স্ট্রিমগুলিকে merge
মেথড দিয়ে মার্জ করি, যা একাধিক স্ট্রিমকে একটি স্ট্রিমে একত্রিত করে যা আইটেমগুলি উপলব্ধ হওয়ার সাথে সাথেই যেকোনো সোর্স স্ট্রিম থেকে আইটেম তৈরি করে, কোনও নির্দিষ্ট ক্রম আরোপ না করে। অবশেষে, আমরা সেই সম্মিলিত স্ট্রিমের উপর লুপ করি messages
-এর পরিবর্তে।
এই সময়ে, messages
বা intervals
কোনওটিরই পিন করা বা মিউটেবল হওয়ার প্রয়োজন নেই, কারণ উভয়কেই একক merged
স্ট্রিমে একত্রিত করা হবে। যাইহোক, merge
-এ এই কলটি কম্পাইল হয় না! (while let
লুপের next
কলটিও নয়, তবে আমরা সেটিতে ফিরে আসব।) এর কারণ হল দুটি স্ট্রিমের আলাদা টাইপ রয়েছে। messages
স্ট্রিমের টাইপ হল Timeout<impl Stream<Item = String>>
, যেখানে Timeout
হল সেই টাইপ যা একটি timeout
কলের জন্য Stream
ইমপ্লিমেন্ট করে। intervals
স্ট্রিমের টাইপ হল impl Stream<Item = u32>
। এই দুটি স্ট্রিম মার্জ করার জন্য, আমাদের তাদের মধ্যে একটিকে অন্যটির সাথে মেলানোর জন্য রূপান্তর করতে হবে। আমরা intervals স্ট্রিমটিকে পুনরায় কাজ করব, কারণ messages ইতিমধ্যেই আমাদের কাঙ্ক্ষিত বেসিক ফর্ম্যাটে রয়েছে এবং টাইমআউট error গুলিকে হ্যান্ডেল করতে হবে (Listing 17-38 দেখুন)।
extern crate trpl; // required for mdbook test
use std::{pin::pin, time::Duration};
use trpl::{ReceiverStream, Stream, StreamExt};
fn main() {
trpl::run(async {
let messages = get_messages().timeout(Duration::from_millis(200));
let intervals = get_intervals()
.map(|count| format!("Interval: {count}"))
.timeout(Duration::from_secs(10));
let merged = messages.merge(intervals);
let mut stream = pin!(merged);
while let Some(result) = stream.next().await {
match result {
Ok(message) => println!("{message}"),
Err(reason) => eprintln!("Problem: {reason:?}"),
}
}
})
}
fn get_messages() -> impl Stream<Item = String> {
let (tx, rx) = trpl::channel();
trpl::spawn_task(async move {
let messages = ["a", "b", "c", "d", "e", "f", "g", "h", "i", "j"];
for (index, message) in messages.into_iter().enumerate() {
let time_to_sleep = if index % 2 == 0 { 100 } else { 300 };
trpl::sleep(Duration::from_millis(time_to_sleep)).await;
tx.send(format!("Message: '{message}'")).unwrap();
}
});
ReceiverStream::new(rx)
}
fn get_intervals() -> impl Stream<Item = u32> {
let (tx, rx) = trpl::channel();
trpl::spawn_task(async move {
let mut count = 0;
loop {
trpl::sleep(Duration::from_millis(1)).await;
count += 1;
tx.send(count).unwrap();
}
});
ReceiverStream::new(rx)
}
প্রথমে, আমরা intervals
-কে একটি স্ট্রিং-এ রূপান্তর করতে map
হেল্পার মেথড ব্যবহার করতে পারি। দ্বিতীয়ত, আমাদের messages
থেকে Timeout
-এর সাথে মেলাতে হবে। যেহেতু আমরা আসলে intervals
-এর জন্য একটি টাইমআউট চাই না, যদিও, আমরা কেবল একটি টাইমআউট তৈরি করতে পারি যা আমরা যে অন্যান্য সময়কাল ব্যবহার করছি তার চেয়ে দীর্ঘ। এখানে, আমরা Duration::from_secs(10)
দিয়ে একটি 10-সেকেন্ডের টাইমআউট তৈরি করি। অবশেষে, আমাদের stream
কে মিউটেবল করতে হবে, যাতে while let
লুপের next
কলগুলি স্ট্রিমের মধ্য দিয়ে পুনরাবৃত্তি করতে পারে এবং এটিকে পিন করতে হবে যাতে এটি করা নিরাপদ হয়। এটি আমাদের প্রায় যেখানে আমাদের থাকা দরকার সেখানে পৌঁছে দেয়। সবকিছু টাইপ চেক করে। আপনি যদি এটি চালান, যদিও, দুটি সমস্যা হবে। প্রথমত, এটি কখনই বন্ধ হবে না! আপনাকে ctrl-c দিয়ে এটি বন্ধ করতে হবে। দ্বিতীয়ত, ইংরেজি বর্ণমালার মেসেজগুলি সমস্ত ইন্টারভাল কাউন্টার মেসেজের মধ্যে চাপা পড়ে যাবে:
--snip--
Interval: 38
Interval: 39
Interval: 40
Message: 'a'
Interval: 41
Interval: 42
Interval: 43
--snip--
Listing 17-39 এই শেষ দুটি সমস্যা সমাধানের একটি উপায় দেখায়।
extern crate trpl; // required for mdbook test use std::{pin::pin, time::Duration}; use trpl::{ReceiverStream, Stream, StreamExt}; fn main() { trpl::run(async { let messages = get_messages().timeout(Duration::from_millis(200)); let intervals = get_intervals() .map(|count| format!("Interval: {count}")) .throttle(Duration::from_millis(100)) .timeout(Duration::from_secs(10)); let merged = messages.merge(intervals).take(20); let mut stream = pin!(merged); while let Some(result) = stream.next().await { match result { Ok(message) => println!("{message}"), Err(reason) => eprintln!("Problem: {reason:?}"), } } }) } fn get_messages() -> impl Stream<Item = String> { let (tx, rx) = trpl::channel(); trpl::spawn_task(async move { let messages = ["a", "b", "c", "d", "e", "f", "g", "h", "i", "j"]; for (index, message) in messages.into_iter().enumerate() { let time_to_sleep = if index % 2 == 0 { 100 } else { 300 }; trpl::sleep(Duration::from_millis(time_to_sleep)).await; tx.send(format!("Message: '{message}'")).unwrap(); } }); ReceiverStream::new(rx) } fn get_intervals() -> impl Stream<Item = u32> { let (tx, rx) = trpl::channel(); trpl::spawn_task(async move { let mut count = 0; loop { trpl::sleep(Duration::from_millis(1)).await; count += 1; tx.send(count).unwrap(); } }); ReceiverStream::new(rx) }
প্রথমে, আমরা intervals
স্ট্রিমে throttle
মেথড ব্যবহার করি যাতে এটি messages
স্ট্রিমকে অভিভূত না করে। থ্রটলিং হল একটি ফাংশন কল করার হার সীমিত করার একটি উপায়—অথবা, এক্ষেত্রে, স্ট্রিমটি কত ঘন ঘন পোল করা হবে। প্রতি 100 মিলিসেকেন্ডে একবার যথেষ্ট হওয়া উচিত, কারণ আমাদের মেসেজগুলি প্রায় সেই সময়ে আসে।
আমরা একটি স্ট্রিম থেকে যে আইটেমগুলি গ্রহণ করব তার সংখ্যা সীমিত করতে, আমরা merged
স্ট্রিমে take
মেথড প্রয়োগ করি, কারণ আমরা চূড়ান্ত আউটপুট সীমিত করতে চাই, শুধুমাত্র একটি স্ট্রিম বা অন্যটি নয়।
এখন যখন আমরা প্রোগ্রামটি চালাই, তখন এটি স্ট্রিম থেকে 20টি আইটেম টানার পরে বন্ধ হয়ে যায় এবং ইন্টারভালগুলি মেসেজগুলিকে অভিভূত করে না। আমরা Interval: 100
বা Interval: 200
বা এই জাতীয় কিছু পাই না, তবে পরিবর্তে Interval: 1
, Interval: 2
ইত্যাদি পাই—এমনকি আমাদের একটি সোর্স স্ট্রিম থাকা সত্ত্বেও যা প্রতি মিলি সেকেন্ডে একটি ইভেন্ট তৈরি করতে পারে। এর কারণ হল throttle
কলটি একটি নতুন স্ট্রিম তৈরি করে যা মূল স্ট্রিমটিকে র্যাপ করে যাতে মূল স্ট্রিমটি শুধুমাত্র থ্রোটল হারে পোল করা হয়, তার নিজস্ব "নেটিভ" হারে নয়। আমাদের কাছে একগুচ্ছ আনহ্যান্ডেলড ইন্টারভাল মেসেজ নেই যা আমরা উপেক্ষা করতে বেছে নিচ্ছি। পরিবর্তে, আমরা সেই ইন্টারভাল মেসেজগুলি প্রথমেই তৈরি করি না! এটি হল Rust-এর ফিউচারের অন্তর্নিহিত "অলসতা", যা আমাদের পারফরম্যান্সের বৈশিষ্ট্যগুলি বেছে নিতে দেয়।
Interval: 1
Message: 'a'
Interval: 2
Interval: 3
Problem: Elapsed(())
Interval: 4
Message: 'b'
Interval: 5
Message: 'c'
Interval: 6
Interval: 7
Problem: Elapsed(())
Interval: 8
Message: 'd'
Interval: 9
Message: 'e'
Interval: 10
Interval: 11
Problem: Elapsed(())
Interval: 12
আমাদের শেষ একটি জিনিস হ্যান্ডেল করতে হবে: error! এই উভয় চ্যানেল-ভিত্তিক স্ট্রিমের সাথে, চ্যানেলের অন্য দিকটি বন্ধ হয়ে গেলে send
কলগুলি ব্যর্থ হতে পারে—এবং এটি কেবল রানটাইম কীভাবে স্ট্রিম তৈরি করা ফিউচারগুলিকে এক্সিকিউট করে তার বিষয়। ఇప్పటి অবধি, আমরা unwrap
কল করে এই সম্ভাবনাটিকে উপেক্ষা করেছি, কিন্তু একটি ভাল আচরণ করা অ্যাপে, আমাদের স্পষ্টভাবে error টি হ্যান্ডেল করা উচিত, অন্তত লুপটি শেষ করে যাতে আমরা আর কোনও মেসেজ পাঠানোর চেষ্টা না করি। Listing 17-40 একটি সহজ error কৌশল দেখায়: সমস্যাটি প্রিন্ট করুন এবং তারপর লুপগুলি থেকে break
করুন।
extern crate trpl; // required for mdbook test use std::{pin::pin, time::Duration}; use trpl::{ReceiverStream, Stream, StreamExt}; fn main() { trpl::run(async { let messages = get_messages().timeout(Duration::from_millis(200)); let intervals = get_intervals() .map(|count| format!("Interval #{count}")) .throttle(Duration::from_millis(500)) .timeout(Duration::from_secs(10)); let merged = messages.merge(intervals).take(20); let mut stream = pin!(merged); while let Some(result) = stream.next().await { match result { Ok(item) => println!("{item}"), Err(reason) => eprintln!("Problem: {reason:?}"), } } }); } fn get_messages() -> impl Stream<Item = String> { let (tx, rx) = trpl::channel(); trpl::spawn_task(async move { let messages = ["a", "b", "c", "d", "e", "f", "g", "h", "i", "j"]; for (index, message) in messages.into_iter().enumerate() { let time_to_sleep = if index % 2 == 0 { 100 } else { 300 }; trpl::sleep(Duration::from_millis(time_to_sleep)).await; if let Err(send_error) = tx.send(format!("Message: '{message}'")) { eprintln!("Cannot send message '{message}': {send_error}"); break; } } }); ReceiverStream::new(rx) } fn get_intervals() -> impl Stream<Item = u32> { let (tx, rx) = trpl::channel(); trpl::spawn_task(async move { let mut count = 0; loop { trpl::sleep(Duration::from_millis(1)).await; count += 1; if let Err(send_error) = tx.send(count) { eprintln!("Could not send interval {count}: {send_error}"); break; }; } }); ReceiverStream::new(rx) }
যথারীতি, একটি মেসেজ পাঠানোর error হ্যান্ডেল করার সঠিক উপায়টি ভিন্ন হবে; শুধু নিশ্চিত করুন যে আপনার একটি কৌশল আছে।
এখন যেহেতু আমরা অনেকগুলি অ্যাসিঙ্ক্রোনাস বাস্তবে দেখেছি, আসুন এক ধাপ পিছিয়ে যাই এবং Future
, Stream
এবং Rust অ্যাসিঙ্ক্রোনাস কাজ করার জন্য যে অন্যান্য মূল trait গুলি ব্যবহার করে তার কয়েকটি বিশদ বিবরণে ডুব দিই।