Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Async দিয়ে কনকারেন্সি প্রয়োগ

এই বিভাগে, আমরা চ্যাপ্টার ১৬-তে থ্রেড (thread) দিয়ে সমাধান করা কিছু কনকারেন্সি চ্যালেঞ্জের জন্য async প্রয়োগ করব। যেহেতু আমরা সেখানে অনেক মূল ধারণা নিয়ে ইতিমধ্যে আলোচনা করেছি, তাই এই বিভাগে আমরা থ্রেড এবং future-এর মধ্যে কী কী পার্থক্য রয়েছে তার উপর মনোযোগ দেব।

অনেক ক্ষেত্রে, async ব্যবহার করে কনকারেন্সির সাথে কাজ করার জন্য API-গুলো থ্রেড ব্যবহারের জন্য থাকা API-গুলোর মতোই। অন্য ক্ষেত্রে, সেগুলো বেশ ভিন্ন হয়ে যায়। এমনকি যখন API-গুলো থ্রেড এবং async-এর মধ্যে দেখতে একই রকম মনে হয়, তখনও তাদের আচরণ প্রায়শই ভিন্ন হয়—এবং তাদের পারফরম্যান্স বৈশিষ্ট্যগুলি প্রায় সবসময়ই ভিন্ন থাকে।

spawn_task দিয়ে নতুন টাস্ক তৈরি করা

Creating a New Thread with Spawn-এ আমরা প্রথম যে অপারেশনটি নিয়ে কাজ করেছিলাম তা হলো দুটি পৃথক থ্রেডে গণনা করা। আসুন async ব্যবহার করে একই কাজ করি। trpl crate একটি spawn_task ফাংশন সরবরাহ করে যা দেখতে thread::spawn API-এর মতোই, এবং একটি sleep ফাংশন যা thread::sleep API-এর একটি async সংস্করণ। আমরা এই দুটিকে একসাথে ব্যবহার করে গণনার উদাহরণটি বাস্তবায়ন করতে পারি, যেমনটি লিস্টিং ১৭-৬-এ দেখানো হয়েছে।

extern crate trpl; // required for mdbook test

use std::time::Duration;

fn main() {
    trpl::run(async {
        trpl::spawn_task(async {
            for i in 1..10 {
                println!("hi number {i} from the first task!");
                trpl::sleep(Duration::from_millis(500)).await;
            }
        });

        for i in 1..5 {
            println!("hi number {i} from the second task!");
            trpl::sleep(Duration::from_millis(500)).await;
        }
    });
}

আমাদের সূচনা বিন্দু হিসাবে, আমরা আমাদের main ফাংশনটি trpl::run দিয়ে সেট আপ করি যাতে আমাদের টপ-লেভেল ফাংশনটি async হতে পারে।

দ্রষ্টব্য: এই অধ্যায়ের এই পয়েন্ট থেকে, প্রতিটি উদাহরণে main-এ trpl::run সহ এই একই র‍্যাপিং কোড অন্তর্ভুক্ত থাকবে, তাই আমরা প্রায়শই এটি এড়িয়ে যাব যেমনটি আমরা main-এর ক্ষেত্রে করি। আপনার কোডে এটি অন্তর্ভুক্ত করতে ভুলবেন না!

তারপর আমরা সেই ব্লকের মধ্যে দুটি লুপ লিখি, প্রতিটিতে একটি trpl::sleep কল রয়েছে, যা পরবর্তী বার্তা পাঠানোর আগে আধা সেকেন্ড (৫০০ মিলিসেকেন্ড) অপেক্ষা করে। আমরা একটি লুপ trpl::spawn_task-এর বডিতে এবং অন্যটি একটি টপ-লেভেল for লুপে রাখি। আমরা sleep কলগুলোর পরে একটি await যোগ করি।

এই কোডটি থ্রেড-ভিত্তিক ইমপ্লিমেন্টেশনের মতোই আচরণ করে—যার মধ্যে এই ঘটনাটিও অন্তর্ভুক্ত যে আপনি যখন এটি চালাবেন তখন আপনার নিজের টার্মিনালে বার্তাগুলি একটি ভিন্ন ক্রমে উপস্থিত হতে পারে:

hi number 1 from the second task!
hi number 1 from the first task!
hi number 2 from the first task!
hi number 2 from the second task!
hi number 3 from the first task!
hi number 3 from the second task!
hi number 4 from the first task!
hi number 4 from the second task!
hi number 5 from the first task!```

এই সংস্করণটি প্রধান async ব্লকের বডিতে `for` লুপ শেষ হওয়ার সাথে সাথেই থেমে যায়, কারণ `spawn_task` দ্বারা তৈরি করা টাস্কটি `main` ফাংশন শেষ হলে বন্ধ হয়ে যায়। আপনি যদি এটিকে টাস্কের সমাপ্তি পর্যন্ত চালাতে চান, তবে প্রথম টাস্কটি সম্পূর্ণ হওয়ার জন্য অপেক্ষা করতে আপনাকে একটি জয়েন হ্যান্ডেল ব্যবহার করতে হবে। থ্রেডের সাথে, আমরা থ্রেডটি চলা শেষ না হওয়া পর্যন্ত "ব্লক" করার জন্য `join` মেথড ব্যবহার করেছি। লিস্টিং ১৭-৭-এ, আমরা একই কাজ করার জন্য `await` ব্যবহার করতে পারি, কারণ টাস্ক হ্যান্ডেল নিজেই একটি future। এর `Output` টাইপ একটি `Result`, তাই আমরা এটিকে await করার পরে `unwrap` করি।

<Listing number="17-7" caption="একটি টাস্ককে সমাপ্তি পর্যন্ত চালানোর জন্য একটি জয়েন হ্যান্ডেলের সাথে `await` ব্যবহার করা" file-name="src/main.rs">

```rust
# extern crate trpl; // required for mdbook test
# 
# use std::time::Duration;
# 
# fn main() {
#     trpl::run(async {
        let handle = trpl::spawn_task(async {
            for i in 1..10 {
                println!("hi number {i} from the first task!");
                trpl::sleep(Duration::from_millis(500)).await;
            }
        });

        for i in 1..5 {
            println!("hi number {i} from the second task!");
            trpl::sleep(Duration::from_millis(500)).await;
        }

        handle.await.unwrap();
#     });
# }

এই আপডেট করা সংস্করণটি উভয় লুপ শেষ না হওয়া পর্যন্ত চলে।

hi number 1 from the second task!
hi number 1 from the first task!
hi number 2 from the first task!
hi number 2 from the second task!
hi number 3 from the first task!
hi number 3 from the second task!
hi number 4 from the first task!
hi number 4 from the second task!
hi number 5 from the first task!
hi number 6 from the first task!
hi number 7 from the first task!
hi number 8 from the first task!
hi number 9 from the first task!

এখন পর্যন্ত, মনে হচ্ছে async এবং থ্রেড আমাদের একই মৌলিক ফলাফল দেয়, শুধু ভিন্ন সিনট্যাক্স দিয়ে: জয়েন হ্যান্ডেলে join কল করার পরিবর্তে await ব্যবহার করা, এবং sleep কলগুলিকে await করা।

বড় পার্থক্য হলো এই কাজটি করার জন্য আমাদের অন্য একটি অপারেটিং সিস্টেম থ্রেড তৈরি করার প্রয়োজন হয়নি। আসলে, আমাদের এখানে একটি টাস্কও তৈরি করার প্রয়োজন নেই। যেহেতু async ব্লকগুলি নামহীন future-এ কম্পাইল হয়, আমরা প্রতিটি লুপকে একটি async ব্লকে রাখতে পারি এবং রানটাইমকে trpl::join ফাংশন ব্যবহার করে সেগুলিকে সম্পূর্ণ করতে দিতে পারি।

Waiting for All Threads to Finishing Using join Handles বিভাগে, আমরা দেখিয়েছিলাম কীভাবে std::thread::spawn কল করার সময় রিটার্ন করা JoinHandle টাইপের উপর join মেথড ব্যবহার করতে হয়। trpl::join ফাংশনটি একই রকম, কিন্তু future-এর জন্য। যখন আপনি এটিকে দুটি future দেন, তখন এটি একটি নতুন future তৈরি করে যার আউটপুট হলো একটি টাপল (tuple) যা আপনার পাস করা প্রতিটি future-এর আউটপুট ধারণ করে যখন তারা উভয়ই সম্পূর্ণ হয়। সুতরাং, লিস্টিং ১৭-৮-এ, আমরা fut1 এবং fut2 উভয়ই শেষ হওয়ার জন্য অপেক্ষা করতে trpl::join ব্যবহার করি। আমরা fut1 এবং fut2-কে await করি না, বরং trpl::join দ্বারা উৎপাদিত নতুন future-কে await করি। আমরা আউটপুট উপেক্ষা করি, কারণ এটি কেবল দুটি ইউনিট মান ধারণকারী একটি টাপল।

extern crate trpl; // required for mdbook test

use std::time::Duration;

fn main() {
    trpl::run(async {
        let fut1 = async {
            for i in 1..10 {
                println!("hi number {i} from the first task!");
                trpl::sleep(Duration::from_millis(500)).await;
            }
        };

        let fut2 = async {
            for i in 1..5 {
                println!("hi number {i} from the second task!");
                trpl::sleep(Duration::from_millis(500)).await;
            }
        };

        trpl::join(fut1, fut2).await;
    });
}

যখন আমরা এটি চালাই, আমরা দেখি উভয় future-ই সম্পূর্ণভাবে চলে:

hi number 1 from the first task!
hi number 1 from the second task!
hi number 2 from the first task!
hi number 2 from the second task!
hi number 3 from the first task!
hi number 3 from the second task!
hi number 4 from the first task!
hi number 4 from the second task!
hi number 5 from the first task!
hi number 6 from the first task!
hi number 7 from the first task!
hi number 8 from the first task!
hi number 9 from the first task!

এখন, আপনি প্রতিবার ঠিক একই ক্রম দেখতে পাবেন, যা আমরা থ্রেডের সাথে যা দেখেছিলাম তার থেকে খুব ভিন্ন। এর কারণ হলো trpl::join ফাংশনটি ফেয়ার (fair), যার অর্থ এটি প্রতিটি future-কে সমানভাবে পরীক্ষা করে, তাদের মধ্যে পর্যায়ক্রমে পরিবর্তন করে, এবং অন্যটি প্রস্তুত থাকলে একটিকে এগিয়ে যেতে দেয় না। থ্রেডের সাথে, অপারেটিং সিস্টেম সিদ্ধান্ত নেয় কোন থ্রেডটি পরীক্ষা করতে হবে এবং এটিকে কতক্ষণ চালাতে দিতে হবে। async রাস্টের সাথে, রানটাইম সিদ্ধান্ত নেয় কোন টাস্কটি পরীক্ষা করতে হবে। (বাস্তবে, বিশদ বিবরণগুলি জটিল হয়ে যায় কারণ একটি async রানটাইম কনকারেন্সি পরিচালনার অংশ হিসাবে পর্দার আড়ালে অপারেটিং সিস্টেম থ্রেড ব্যবহার করতে পারে, তাই ফেয়ারনেস নিশ্চিত করা একটি রানটাইমের জন্য আরও বেশি কাজ হতে পারে—তবে এটি এখনও সম্ভব!) রানটাইমগুলিকে যেকোনো প্রদত্ত অপারেশনের জন্য ফেয়ারনেসের গ্যারান্টি দিতে হয় না, এবং তারা প্রায়শই আপনাকে ফেয়ারনেস চান কি না তা বেছে নিতে বিভিন্ন API অফার করে।

ফিউচার await করার এই ভিন্নতাগুলো চেষ্টা করে দেখুন এবং দেখুন তারা কী করে:

  • যেকোনো একটি বা উভয় লুপের চারপাশ থেকে async ব্লকটি সরিয়ে ফেলুন।
  • প্রতিটি async ব্লক সংজ্ঞায়িত করার সাথে সাথেই await করুন।
  • কেবলমাত্র প্রথম লুপটিকে একটি async ব্লকে র‍্যাপ করুন, এবং দ্বিতীয় লুপের বডির পরে ফলাফলস্বরূপ future-টিকে await করুন।

একটি অতিরিক্ত চ্যালেঞ্জের জন্য, দেখুন আপনি প্রতিটি ক্ষেত্রে কোড চালানোর আগে আউটপুট কী হবে তা বের করতে পারেন কিনা!

মেসেজ পাসিং ব্যবহার করে দুটি টাস্কে গণনা করা

Future-এর মধ্যে ডেটা শেয়ার করাও পরিচিত মনে হবে: আমরা আবার মেসেজ পাসিং ব্যবহার করব, কিন্তু এবার async সংস্করণ টাইপ এবং ফাংশনের সাথে। আমরা Using Message Passing to Transfer Data Between Threads-এ যে পথ নিয়েছিলাম তার থেকে কিছুটা ভিন্ন পথ নেব যাতে থ্রেড-ভিত্তিক এবং future-ভিত্তিক কনকারেন্সির মধ্যে কিছু মূল পার্থক্য তুলে ধরা যায়। লিস্টিং ১৭-৯-এ, আমরা কেবল একটি async ব্লক দিয়ে শুরু করব—একটি পৃথক থ্রেড তৈরি করার মতো করে একটি পৃথক টাস্ক তৈরি না করে

extern crate trpl; // required for mdbook test

fn main() {
    trpl::run(async {
        let (tx, mut rx) = trpl::channel();

        let val = String::from("hi");
        tx.send(val).unwrap();

        let received = rx.recv().await.unwrap();
        println!("received '{received}'");
    });
}

এখানে, আমরা trpl::channel ব্যবহার করি, যা চ্যাপ্টার ১৬-তে আমরা থ্রেডের সাথে ব্যবহার করা মাল্টিপল-প্রডিউসার, সিঙ্গেল-কনজিউমার চ্যানেল API-এর একটি async সংস্করণ। API-এর async সংস্করণটি থ্রেড-ভিত্তিক সংস্করণ থেকে সামান্য ভিন্ন: এটি একটি অপরিবর্তনীয় রিসিভার rx-এর পরিবর্তে একটি পরিবর্তনযোগ্য রিসিভার ব্যবহার করে, এবং এর recv মেথড সরাসরি মান তৈরি করার পরিবর্তে একটি future তৈরি করে যা আমাদের await করতে হবে। এখন আমরা সেন্ডার থেকে রিসিভারে মেসেজ পাঠাতে পারি। লক্ষ্য করুন যে আমাদের একটি পৃথক থ্রেড বা এমনকি একটি টাস্কও তৈরি করতে হবে না; আমাদের কেবল rx.recv কলটি await করতে হবে।

std::mpsc::channel-এর সিঙ্ক্রোনাস Receiver::recv মেথডটি একটি মেসেজ না পাওয়া পর্যন্ত ব্লক করে। trpl::Receiver::recv মেথডটি তা করে না, কারণ এটি async। ব্লক করার পরিবর্তে, এটি রানটাইমের কাছে নিয়ন্ত্রণ ফিরিয়ে দেয় যতক্ষণ না একটি মেসেজ পাওয়া যায় বা চ্যানেলের সেন্ড সাইড বন্ধ হয়ে যায়। এর বিপরীতে, আমরা send কলটি await করি না, কারণ এটি ব্লক করে না। এটির প্রয়োজন নেই, কারণ আমরা যে চ্যানেলে পাঠাচ্ছি তা আনবাউন্ডেড (unbounded)।

দ্রষ্টব্য: যেহেতু এই সমস্ত async কোড একটি trpl::run কলের মধ্যে একটি async ব্লকে চলে, তাই এর মধ্যে সবকিছুই ব্লকিং এড়াতে পারে। যাইহোক, এর বাইরের কোডটি run ফাংশন রিটার্ন করার উপর ব্লক করবে। এটাই trpl::run ফাংশনের মূল উদ্দেশ্য: এটি আপনাকে বেছে নিতে দেয় কোথায় কিছু async কোডের সেটের উপর ব্লক করতে হবে, এবং এইভাবে সিঙ্ক এবং async কোডের মধ্যে কোথায় রূপান্তর করতে হবে। বেশিরভাগ async রানটাইমে, run-এর নাম আসলে ঠিক এই কারণেই block_on

এই উদাহরণ সম্পর্কে দুটি জিনিস লক্ষ্য করুন। প্রথমত, বার্তাটি সাথে সাথেই পৌঁছে যাবে। দ্বিতীয়ত, যদিও আমরা এখানে একটি future ব্যবহার করি, এখনও কোনো কনকারেন্সি নেই। লিস্টিংয়ের সবকিছু ক্রমানুসারে ঘটে, ঠিক যেমনটি হতো যদি কোনো future জড়িত না থাকত।

আসুন প্রথম অংশটি মোকাবেলা করি একটি সিরিজের বার্তা পাঠিয়ে এবং তাদের মধ্যে sleep করে, যেমনটি লিস্টিং ১৭-১০-এ দেখানো হয়েছে।

extern crate trpl; // required for mdbook test

use std::time::Duration;

fn main() {
    trpl::run(async {
        let (tx, mut rx) = trpl::channel();

        let vals = vec![
            String::from("hi"),
            String::from("from"),
            String::from("the"),
            String::from("future"),
        ];

        for val in vals {
            tx.send(val).unwrap();
            trpl::sleep(Duration::from_millis(500)).await;
        }

        while let Some(value) = rx.recv().await {
            println!("received '{value}'");
        }
    });
}

বার্তা পাঠানোর পাশাপাশি, আমাদের সেগুলি গ্রহণ করতে হবে। এই ক্ষেত্রে, যেহেতু আমরা জানি কতগুলি বার্তা আসছে, আমরা চারবার rx.recv().await কল করে এটি ম্যানুয়ালি করতে পারতাম। তবে বাস্তব জগতে, আমরা সাধারণত কিছু অজানা সংখ্যক বার্তার জন্য অপেক্ষা করব, তাই আমাদের আর কোনো বার্তা নেই তা নির্ধারণ না করা পর্যন্ত অপেক্ষা করতে হবে।

লিস্টিং ১৬-১০-এ, আমরা একটি সিঙ্ক্রোনাস চ্যানেল থেকে প্রাপ্ত সমস্ত আইটেম প্রসেস করার জন্য একটি for লুপ ব্যবহার করেছি। তবে রাস্টের কাছে এখনও আইটেমের একটি অ্যাসিঙ্ক্রোনাস সিরিজের উপর for লুপ লেখার কোনো উপায় নেই, তাই আমাদের এমন একটি লুপ ব্যবহার করতে হবে যা আমরা আগে দেখিনি: while let কন্ডিশনাল লুপ। এটি হলো if let কনস্ট্রাক্টের লুপ সংস্করণ যা আমরা Concise Control Flow with if let and let else বিভাগে দেখেছি। লুপটি ততক্ষণ চলতে থাকবে যতক্ষণ এর নির্দিষ্ট করা প্যাটার্নটি মানের সাথে মিলতে থাকবে।

rx.recv কল একটি future তৈরি করে, যা আমরা await করি। রানটাইম future-টিকে প্রস্তুত না হওয়া পর্যন্ত পজ (pause) করবে। একবার একটি বার্তা এসে গেলে, future-টি যতবার একটি বার্তা আসবে ততবার Some(message)-এ রিজলভ হবে। যখন চ্যানেলটি বন্ধ হয়ে যাবে, কোনো বার্তা এসেছে কি না নির্বিশেষে, future-টি পরিবর্তে None রিজলভ করবে এটি নির্দেশ করার জন্য যে আর কোনো মান নেই এবং তাই আমাদের পোলিং বন্ধ করা উচিত—অর্থাৎ, await করা বন্ধ করা উচিত।

while let লুপ এই সবকিছুকে একত্রিত করে। যদি rx.recv().await কল করার ফলাফল Some(message) হয়, আমরা বার্তাটিতে অ্যাক্সেস পাই এবং আমরা এটি লুপের বডিতে ব্যবহার করতে পারি, যেমনটি আমরা if let-এর সাথে করতে পারতাম। যদি ফলাফলটি None হয়, লুপটি শেষ হয়ে যায়। প্রতিবার লুপটি সম্পূর্ণ হলে, এটি আবার await পয়েন্টে আঘাত করে, তাই রানটাইম এটিকে আবার পজ করে যতক্ষণ না অন্য একটি বার্তা আসে।

কোডটি এখন সফলভাবে সমস্ত বার্তা পাঠায় এবং গ্রহণ করে। দুর্ভাগ্যবশত, এখনও কয়েকটি সমস্যা রয়েছে। একটি হলো, বার্তাগুলি আধা-সেকেন্ডের ব্যবধানে আসে না। প্রোগ্রাম শুরু করার ২ সেকেন্ড (২,০০০ মিলিসেকেন্ড) পরে সেগুলি একবারে আসে। আরেকটি হলো, এই প্রোগ্রামটি কখনও শেষ হয় না! পরিবর্তে, এটি নতুন বার্তার জন্য চিরকাল অপেক্ষা করে। আপনাকে ctrl-c ব্যবহার করে এটি বন্ধ করতে হবে।

আসুন প্রথমে পরীক্ষা করে দেখি কেন বার্তাগুলি পুরো বিলম্বের পরে একবারে আসে, প্রতিটিটির মধ্যে বিলম্ব সহ আসার পরিবর্তে। একটি নির্দিষ্ট async ব্লকের মধ্যে, কোডে await কিওয়ার্ডগুলি যে ক্রমে উপস্থিত হয়, প্রোগ্রাম চলার সময় সেগুলি সেই ক্রমেই কার্যকর হয়।

লিস্টিং ১৭-১০-এ কেবল একটি async ব্লক রয়েছে, তাই এর মধ্যে সবকিছু রৈখিকভাবে (linearly) চলে। এখনও কোনো কনকারেন্সি নেই। সমস্ত tx.send কল ঘটে, যার মধ্যে সমস্ত trpl::sleep কল এবং তাদের সংশ্লিষ্ট await পয়েন্টগুলি মিশে থাকে। শুধুমাত্র তারপরেই while let লুপ recv কলগুলির কোনো await পয়েন্টের মধ্য দিয়ে যেতে পারে।

আমরা যে আচরণটি চাই তা পেতে, যেখানে প্রতিটি বার্তার মধ্যে sleep বিলম্ব ঘটে, আমাদের tx এবং rx অপারেশনগুলিকে তাদের নিজস্ব async ব্লকে রাখতে হবে, যেমনটি লিস্টিং ১৭-১১-এ দেখানো হয়েছে। তারপরে রানটাইম trpl::join ব্যবহার করে সেগুলির প্রতিটি আলাদাভাবে চালাতে পারে, ঠিক গণনার উদাহরণের মতো। আবারও, আমরা trpl::join কল করার ফলাফল await করি, পৃথক future-গুলিকে নয়। যদি আমরা পৃথক future-গুলিকে ক্রমানুসারে await করতাম, আমরা কেবল একটি ক্রমিক প্রবাহে ফিরে আসতাম—ঠিক যা আমরা না করার চেষ্টা করছি।

extern crate trpl; // required for mdbook test

use std::time::Duration;

fn main() {
    trpl::run(async {
        let (tx, mut rx) = trpl::channel();

        let tx_fut = async {
            let vals = vec![
                String::from("hi"),
                String::from("from"),
                String::from("the"),
                String::from("future"),
            ];

            for val in vals {
                tx.send(val).unwrap();
                trpl::sleep(Duration::from_millis(500)).await;
            }
        };

        let rx_fut = async {
            while let Some(value) = rx.recv().await {
                println!("received '{value}'");
            }
        };

        trpl::join(tx_fut, rx_fut).await;
    });
}

লিস্টিং ১৭-১১-এর আপডেট করা কোড দিয়ে, বার্তাগুলি ২ সেকেন্ড পরে একবারে আসার পরিবর্তে ৫০০-মিলিসেকেন্ডের ব্যবধানে প্রিন্ট হয়।

প্রোগ্রামটি এখনও কখনও শেষ হয় না, কারণ while let লুপটি trpl::join-এর সাথে যেভাবে ইন্টারঅ্যাক্ট করে তার জন্য:

  • trpl::join থেকে রিটার্ন করা future-টি তখনই সম্পূর্ণ হয় যখন এটিতে পাস করা উভয় future সম্পূর্ণ হয়।
  • tx future-টি vals-এর শেষ বার্তাটি পাঠানোর পরে স্লিপ করা শেষ হলে সম্পূর্ণ হয়।
  • rx future-টি while let লুপ শেষ না হওয়া পর্যন্ত সম্পূর্ণ হবে না।
  • while let লুপটি rx.recv await করা None তৈরি না করা পর্যন্ত শেষ হবে না।
  • rx.recv await করা তখনই None রিটার্ন করবে যখন চ্যানেলের অন্য প্রান্তটি বন্ধ হয়ে যাবে।
  • চ্যানেলটি তখনই বন্ধ হবে যদি আমরা rx.close কল করি বা যখন সেন্ডার সাইড, tx, ড্রপ (drop) হয়ে যায়।
  • আমরা কোথাও rx.close কল করি না, এবং tx ড্রপ হবে না যতক্ষণ না trpl::run-কে পাস করা সবচেয়ে বাইরের async ব্লকটি শেষ হয়।
  • ব্লকটি শেষ হতে পারে না কারণ এটি trpl::join সম্পূর্ণ হওয়ার উপর ব্লক হয়ে আছে, যা আমাদের এই তালিকার শীর্ষে ফিরিয়ে নিয়ে যায়।

আমরা rx.close কল করে ম্যানুয়ালি rx বন্ধ করতে পারতাম, কিন্তু এর খুব একটা মানে হয় না। একটি নির্বিচারে সংখ্যক বার্তা পরিচালনা করার পরে থামলে প্রোগ্রামটি বন্ধ হয়ে যাবে, কিন্তু আমরা বার্তা মিস করতে পারি। আমাদের নিশ্চিত করার জন্য অন্য কোনো উপায় প্রয়োজন যাতে tx ফাংশনটির শেষ হওয়ার আগে ড্রপ হয়ে যায়।

এখন, আমরা যে async ব্লকে বার্তা পাঠাই তা কেবল tx ধার (borrow) করে কারণ একটি বার্তা পাঠানোর জন্য মালিকানার (ownership) প্রয়োজন হয় না, কিন্তু যদি আমরা tx-কে সেই async ব্লকের মধ্যে মুভ (move) করতে পারতাম, তবে সেই ব্লকটি শেষ হয়ে গেলে এটি ড্রপ হয়ে যেত। চ্যাপ্টার ১৩-এর Capturing References or Moving Ownership বিভাগে, আপনি শিখেছেন কীভাবে ক্লোজারের সাথে move কিওয়ার্ড ব্যবহার করতে হয়, এবং, চ্যাপ্টার ১৬-এর Using move Closures with Threads বিভাগে আলোচনা করা হয়েছে, থ্রেডের সাথে কাজ করার সময় আমাদের প্রায়শই ক্লোজারের মধ্যে ডেটা মুভ করতে হয়। একই মৌলিক গতিবিদ্যা async ব্লকগুলিতেও প্রযোজ্য, তাই move কিওয়ার্ড async ব্লকগুলির সাথে ঠিক সেভাবেই কাজ করে যেমনটি ক্লোজারের সাথে করে।

লিস্টিং ১৭-১২-এ, আমরা বার্তা পাঠানোর জন্য ব্যবহৃত ব্লকটিকে async থেকে async move-এ পরিবর্তন করি। যখন আমরা কোডের এই সংস্করণটি চালাই, তখন শেষ বার্তাটি পাঠানো এবং গ্রহণ করার পরে এটি সুন্দরভাবে বন্ধ হয়ে যায়।

extern crate trpl; // required for mdbook test

use std::time::Duration;

fn main() {
    trpl::run(async {
        let (tx, mut rx) = trpl::channel();

        let tx_fut = async move {
            let vals = vec![
                String::from("hi"),
                String::from("from"),
                String::from("the"),
                String::from("future"),
            ];

            for val in vals {
                tx.send(val).unwrap();
                trpl::sleep(Duration::from_millis(500)).await;
            }
        };

        let rx_fut = async {
            while let Some(value) = rx.recv().await {
                println!("received '{value}'");
            }
        };

        trpl::join(tx_fut, rx_fut).await;
    });
}

এই async চ্যানেলটিও একটি মাল্টিপল-প্রডিউসার চ্যানেল, তাই আমরা যদি একাধিক future থেকে বার্তা পাঠাতে চাই তবে tx-এর উপর clone কল করতে পারি, যেমনটি লিস্টিং ১৭-১৩-এ দেখানো হয়েছে।

extern crate trpl; // required for mdbook test

use std::time::Duration;

fn main() {
    trpl::run(async {
        let (tx, mut rx) = trpl::channel();

        let tx1 = tx.clone();
        let tx1_fut = async move {
            let vals = vec![
                String::from("hi"),
                String::from("from"),
                String::from("the"),
                String::from("future"),
            ];

            for val in vals {
                tx1.send(val).unwrap();
                trpl::sleep(Duration::from_millis(500)).await;
            }
        };

        let rx_fut = async {
            while let Some(value) = rx.recv().await {
                println!("received '{value}'");
            }
        };

        let tx_fut = async move {
            let vals = vec![
                String::from("more"),
                String::from("messages"),
                String::from("for"),
                String::from("you"),
            ];

            for val in vals {
                tx.send(val).unwrap();
                trpl::sleep(Duration::from_millis(1500)).await;
            }
        };

        trpl::join3(tx1_fut, tx_fut, rx_fut).await;
    });
}

প্রথমে, আমরা tx ক্লোন করি, প্রথম async ব্লকের বাইরে tx1 তৈরি করি। আমরা tx1-কে সেই ব্লকের মধ্যে মুভ করি ঠিক যেমনটি আমরা আগে tx-এর সাথে করেছি। তারপরে, পরে, আমরা মূল tx-কে একটি নতুন async ব্লকে মুভ করি, যেখানে আমরা সামান্য ধীর গতিতে আরও বার্তা পাঠাই। আমরা ঘটনাচক্রে এই নতুন async ব্লকটি বার্তা গ্রহণের জন্য async ব্লকের পরে রাখি, তবে এটি এর আগেও থাকতে পারত। মূল বিষয় হলো future-গুলি কোন ক্রমে await করা হয়, কোন ক্রমে সেগুলি তৈরি করা হয় তা নয়।

বার্তা পাঠানোর জন্য উভয় async ব্লককেই async move ব্লক হতে হবে যাতে tx এবং tx1 উভয়ই সেই ব্লকগুলি শেষ হলে ড্রপ হয়ে যায়। অন্যথায়, আমরা সেই একই অসীম লুপে ফিরে যাব যা দিয়ে আমরা শুরু করেছিলাম। অবশেষে, আমরা অতিরিক্ত future-টি পরিচালনা করার জন্য trpl::join থেকে trpl::join3-এ স্যুইচ করি।

এখন আমরা উভয় সেন্ডিং future থেকে সমস্ত বার্তা দেখতে পাই, এবং যেহেতু সেন্ডিং future-গুলি পাঠানোর পরে সামান্য ভিন্ন বিলম্ব ব্যবহার করে, বার্তাগুলিও সেই ভিন্ন ব্যবধানে গৃহীত হয়।

received 'hi'
received 'more'
received 'from'
received 'the'
received 'messages'
received 'future'
received 'for'
received 'you'

এটি একটি ভালো শুরু, কিন্তু এটি আমাদের কেবল কয়েকটি future-এর মধ্যে সীমাবদ্ধ রাখে: join-এর সাথে দুটি, বা join3-এর সাথে তিনটি। আসুন দেখি আমরা কীভাবে আরও future-এর সাথে কাজ করতে পারি।