অ্যাসিঙ্ক্রোনাস ব্যবহার করে কনকারেন্সি প্রয়োগ করা (Applying Concurrency with Async)

এই বিভাগে, আমরা Chapter 16-এ থ্রেড দিয়ে মোকাবিলা করা একই কনকারেন্সি চ্যালেঞ্জগুলির মধ্যে কয়েকটিতে অ্যাসিঙ্ক্রোনাস প্রয়োগ করব। যেহেতু আমরা ইতিমধ্যে সেখানে মূল ধারণাগুলির অনেকগুলি নিয়ে কথা বলেছি, তাই এই বিভাগে আমরা থ্রেড এবং ফিউচারের মধ্যে কী আলাদা তার উপর ফোকাস করব।

অনেক ক্ষেত্রে, অ্যাসিঙ্ক্রোনাস ব্যবহার করে কনকারেন্সির সাথে কাজ করার জন্য API গুলি থ্রেড ব্যবহার করার API-গুলির মতোই। অন্য ক্ষেত্রে, সেগুলি বেশ ভিন্ন হয়। এমনকি যখন API গুলি থ্রেড এবং অ্যাসিঙ্ক্রোনাসের মধ্যে দেখতে একই রকম দেখায়, তখনও তাদের প্রায়শই আলাদা আচরণ থাকে—এবং তাদের প্রায় সবসময়ই আলাদা পারফরম্যান্স বৈশিষ্ট্য থাকে।

spawn_task দিয়ে একটি নতুন টাস্ক তৈরি করা (Creating a New Task with spawn_task)

আমরা Creating a New Thread with Spawn-এ যে প্রথম অপারেশনটি মোকাবেলা করেছি তা হল দুটি পৃথক থ্রেডে গণনা করা। আসুন অ্যাসিঙ্ক্রোনাস ব্যবহার করে একই কাজ করি। trpl ক্রেট একটি spawn_task ফাংশন সরবরাহ করে যা thread::spawn API-এর মতোই এবং একটি sleep ফাংশন যা thread::sleep API-এর একটি অ্যাসিঙ্ক্রোনাস সংস্করণ। আমরা এইগুলিকে একসাথে ব্যবহার করে গণনার উদাহরণটি ইমপ্লিমেন্ট করতে পারি, যেমনটি Listing 17-6-এ দেখানো হয়েছে।

extern crate trpl; // required for mdbook test

use std::time::Duration;

fn main() {
    trpl::run(async {
        trpl::spawn_task(async {
            for i in 1..10 {
                println!("hi number {i} from the first task!");
                trpl::sleep(Duration::from_millis(500)).await;
            }
        });

        for i in 1..5 {
            println!("hi number {i} from the second task!");
            trpl::sleep(Duration::from_millis(500)).await;
        }
    });
}

আমাদের শুরুর পয়েন্ট হিসাবে, আমরা আমাদের main ফাংশনটিকে trpl::run দিয়ে সেট আপ করি যাতে আমাদের টপ-লেভেল ফাংশনটি অ্যাসিঙ্ক্রোনাস হতে পারে।

Note: এই চ্যাপ্টারের এখন থেকে সামনের দিকে, প্রতিটি উদাহরণে main-এ trpl::run সহ এই একই র‍্যাপিং কোড অন্তর্ভুক্ত থাকবে, তাই আমরা প্রায়শই এটিকে বাদ দেব যেমনটি আমরা main-এর সাথে করি। আপনার কোডে এটি অন্তর্ভুক্ত করতে ভুলবেন না!

তারপর আমরা সেই ব্লকের মধ্যে দুটি লুপ লিখি, প্রত্যেকটিতে একটি trpl::sleep কল রয়েছে, যা পরবর্তী মেসেজ পাঠানোর আগে আধা সেকেন্ড (500 মিলিসেকেন্ড) অপেক্ষা করে। আমরা একটি লুপ trpl::spawn_task-এর বডিতে রাখি এবং অন্যটি একটি টপ-লেভেল for লুপে রাখি। আমরা sleep কলের পরেও একটি await যুক্ত করি।

এই কোডটি থ্রেড-ভিত্তিক ইমপ্লিমেন্টেশনের মতোই আচরণ করে—এই সত্যটি সহ যে আপনি যখন এটি চালাবেন তখন আপনার নিজের টার্মিনালে মেসেজগুলি ভিন্ন ক্রমে প্রদর্শিত হতে পারে:

hi number 1 from the second task!
hi number 1 from the first task!
hi number 2 from the first task!
hi number 2 from the second task!
hi number 3 from the first task!
hi number 3 from the second task!
hi number 4 from the first task!
hi number 4 from the second task!
hi number 5 from the first task!

এই সংস্করণটি main অ্যাসিঙ্ক্রোনাস ব্লকের বডির for লুপ শেষ হওয়ার সাথে সাথেই বন্ধ হয়ে যায়, কারণ main ফাংশন শেষ হলে spawn_task দ্বারা স্পন করা টাস্কটি বন্ধ হয়ে যায়। আপনি যদি এটিকে টাস্কটি সম্পূর্ণ হওয়া পর্যন্ত চালাতে চান তবে আপনাকে প্রথম টাস্কটি সম্পূর্ণ হওয়ার জন্য অপেক্ষা করতে একটি join handle ব্যবহার করতে হবে। থ্রেডের সাথে, আমরা থ্রেডটি চালানো শেষ না হওয়া পর্যন্ত “ব্লক” করতে join মেথড ব্যবহার করেছি। Listing 17-7-এ, আমরা একই কাজ করতে await ব্যবহার করতে পারি, কারণ টাস্ক হ্যান্ডেল নিজেই একটি ফিউচার। এর Output টাইপ হল একটি Result, তাই আমরা এটির জন্য অপেক্ষা করার পরেও এটিকে আনর‍্যাপ করি।

extern crate trpl; // required for mdbook test

use std::time::Duration;

fn main() {
    trpl::run(async {
        let handle = trpl::spawn_task(async {
            for i in 1..10 {
                println!("hi number {i} from the first task!");
                trpl::sleep(Duration::from_millis(500)).await;
            }
        });

        for i in 1..5 {
            println!("hi number {i} from the second task!");
            trpl::sleep(Duration::from_millis(500)).await;
        }

        handle.await.unwrap();
    });
}

এই আপডেটেড সংস্করণটি উভয় লুপ শেষ না হওয়া পর্যন্ত চলে।

hi number 1 from the second task!
hi number 1 from the first task!
hi number 2 from the first task!
hi number 2 from the second task!
hi number 3 from the first task!
hi number 3 from the second task!
hi number 4 from the first task!
hi number 4 from the second task!
hi number 5 from the first task!
hi number 6 from the first task!
hi number 7 from the first task!
hi number 8 from the first task!
hi number 9 from the first task!

এখন পর্যন্ত, মনে হচ্ছে অ্যাসিঙ্ক্রোনাস এবং থ্রেড আমাদের একই বেসিক ফলাফল দেয়, শুধুমাত্র ভিন্ন সিনট্যাক্স সহ: join handle-এ join কল করার পরিবর্তে await ব্যবহার করা এবং sleep কলের জন্য অপেক্ষা করা।

বড় পার্থক্য হল এটি করার জন্য আমাদের অন্য অপারেটিং সিস্টেম থ্রেড স্পন করার প্রয়োজন হয়নি। আসলে, আমাদের এখানে একটি টাস্কও স্পন করার দরকার নেই। যেহেতু অ্যাসিঙ্ক্রোনাস ব্লকগুলি বেনামী ফিউচারে কম্পাইল হয়, তাই আমরা প্রতিটি লুপকে একটি অ্যাসিঙ্ক্রোনাস ব্লকে রাখতে পারি এবং trpl::join ফাংশন ব্যবহার করে রানটাইমকে উভয়কেই সম্পূর্ণ হওয়া পর্যন্ত চালাতে দিতে পারি।

Waiting for All Threads to Finishing Using join Handles বিভাগে, আমরা দেখিয়েছি কিভাবে আপনি std::thread::spawn কল করলে রিটার্ন করা JoinHandle টাইপের join মেথড ব্যবহার করবেন। trpl::join ফাংশনটি একই রকম, কিন্তু ফিউচারের জন্য। আপনি যখন এটিকে দুটি ফিউচার দেন, তখন এটি একটি একক নতুন ফিউচার তৈরি করে যার আউটপুট হল একটি টাপল যাতে আপনি যে প্রতিটি ফিউচার পাস করেছেন তার আউটপুট থাকে একবার সেগুলি উভয়ই সম্পূর্ণ হয়ে গেলে। সুতরাং, Listing 17-8-এ, আমরা fut1 এবং fut2 উভয়ের শেষ হওয়ার জন্য অপেক্ষা করতে trpl::join ব্যবহার করি। আমরা fut1 এবং fut2-এর জন্য অপেক্ষা করি না বরং trpl::join দ্বারা উৎপাদিত নতুন ফিউচারের জন্য অপেক্ষা করি। আমরা আউটপুট উপেক্ষা করি, কারণ এটি কেবল দুটি ইউনিট মান ধারণকারী একটি টাপল।

extern crate trpl; // required for mdbook test

use std::time::Duration;

fn main() {
    trpl::run(async {
        let fut1 = async {
            for i in 1..10 {
                println!("hi number {i} from the first task!");
                trpl::sleep(Duration::from_millis(500)).await;
            }
        };

        let fut2 = async {
            for i in 1..5 {
                println!("hi number {i} from the second task!");
                trpl::sleep(Duration::from_millis(500)).await;
            }
        };

        trpl::join(fut1, fut2).await;
    });
}

যখন আমরা এটি চালাই, তখন আমরা উভয় ফিউচার সম্পূর্ণ হওয়া পর্যন্ত চলতে দেখি:

hi number 1 from the first task!
hi number 1 from the second task!
hi number 2 from the first task!
hi number 2 from the second task!
hi number 3 from the first task!
hi number 3 from the second task!
hi number 4 from the first task!
hi number 4 from the second task!
hi number 5 from the first task!
hi number 6 from the first task!
hi number 7 from the first task!
hi number 8 from the first task!
hi number 9 from the first task!

এখন, আপনি প্রতিবার একই ক্রম দেখতে পাবেন, যা আমরা থ্রেড দিয়ে যা দেখেছি তার থেকে খুব আলাদা। এর কারণ হল trpl::join ফাংশনটি ফেয়ার, অর্থাৎ এটি প্রতিটি ফিউচারকে সমানভাবে প্রায়শই পরীক্ষা করে, তাদের মধ্যে পরিবর্তন করে এবং অন্যটি প্রস্তুত থাকলে কখনই একটিকে এগিয়ে যেতে দেয় না। থ্রেডের সাথে, অপারেটিং সিস্টেম সিদ্ধান্ত নেয় কোন থ্রেডটি পরীক্ষা করতে হবে এবং কতক্ষণ চলতে দিতে হবে। অ্যাসিঙ্ক্রোনাস Rust-এর সাথে, রানটাইম সিদ্ধান্ত নেয় কোন টাস্কটি পরীক্ষা করতে হবে। (বাস্তবে, বিশদগুলি জটিল হয়ে যায় কারণ একটি অ্যাসিঙ্ক্রোনাস রানটাইম হুডের নিচে অপারেটিং সিস্টেম থ্রেডগুলিকে কনকারেন্সি পরিচালনার অংশ হিসাবে ব্যবহার করতে পারে, তাই ফেয়ারনেস গ্যারান্টি দেওয়া একটি রানটাইমের জন্য আরও বেশি কাজ হতে পারে—তবে এটি এখনও সম্ভব!) রানটাইমগুলিকে কোনও প্রদত্ত অপারেশনের জন্য ফেয়ারনেসের গ্যারান্টি দিতে হবে না এবং তারা প্রায়শই বিভিন্ন API সরবরাহ করে যাতে আপনি ফেয়ারনেস চান কিনা তা বেছে নিতে পারেন।

ফিউচারগুলির জন্য অপেক্ষা করার এই কিছু ভেরিয়েশন চেষ্টা করুন এবং দেখুন তারা কী করে:

  • যেকোনো একটি বা উভয় লুপের চারপাশ থেকে অ্যাসিঙ্ক্রোনাস ব্লকটি সরিয়ে দিন।
  • প্রতিটি অ্যাসিঙ্ক্রোনাস ব্লককে সংজ্ঞায়িত করার পরপরই এটির জন্য অপেক্ষা করুন।
  • শুধুমাত্র প্রথম লুপটিকে একটি অ্যাসিঙ্ক্রোনাস ব্লকে র‍্যাপ করুন এবং দ্বিতীয় লুপের বডির পরে ফলাফলের ফিউচারের জন্য অপেক্ষা করুন।

একটি অতিরিক্ত চ্যালেঞ্জের জন্য, কোড চালানোর আগে প্রতিটি ক্ষেত্রে আউটপুট কী হবে তা বের করার চেষ্টা করুন!

মেসেজ পাসিং ব্যবহার করে দুটি টাস্কে গণনা করা (Counting Up on Two Tasks Using Message Passing)

ফিউচারের মধ্যে ডেটা শেয়ার করাও পরিচিত হবে: আমরা আবার মেসেজ পাসিং ব্যবহার করব, কিন্তু এবার টাইপ এবং ফাংশনের অ্যাসিঙ্ক্রোনাস সংস্করণ সহ। থ্রেড-ভিত্তিক এবং ফিউচার-ভিত্তিক কনকারেন্সির মধ্যে কিছু মূল পার্থক্য বোঝানোর জন্য আমরা Using Message Passing to Transfer Data Between Threads-এ যা করেছি তার থেকে সামান্য ভিন্ন পথ নেব। Listing 17-9-এ, আমরা শুধুমাত্র একটি একক অ্যাসিঙ্ক্রোনাস ব্লক দিয়ে শুরু করব—একটি পৃথক থ্রেড স্পন করার মতো একটি পৃথক টাস্ক স্পন না করে।

extern crate trpl; // required for mdbook test

fn main() {
    trpl::run(async {
        let (tx, mut rx) = trpl::channel();

        let val = String::from("hi");
        tx.send(val).unwrap();

        let received = rx.recv().await.unwrap();
        println!("Got: {received}");
    });
}

এখানে, আমরা trpl::channel ব্যবহার করি, যা Chapter 16-এ থ্রেডের সাথে ব্যবহার করা মাল্টিপল-প্রডিউসার, সিঙ্গল-কনজিউমার চ্যানেল API-এর একটি অ্যাসিঙ্ক্রোনাস সংস্করণ। API-এর অ্যাসিঙ্ক্রোনাস সংস্করণটি থ্রেড-ভিত্তিক সংস্করণের থেকে সামান্য আলাদা: এটি একটি ইমিউটেবল রিসিভার rx-এর পরিবর্তে একটি মিউটেবল ব্যবহার করে এবং এর recv মেথড সরাসরি মান তৈরি করার পরিবর্তে আমাদের অপেক্ষা করতে হবে এমন একটি ফিউচার তৈরি করে। এখন আমরা সেন্ডার থেকে রিসিভারে মেসেজ পাঠাতে পারি। লক্ষ্য করুন যে আমাদের একটি পৃথক থ্রেড বা এমনকি একটি টাস্কও স্পন করতে হবে না; আমাদের কেবল rx.recv কলের জন্য অপেক্ষা করতে হবে।

std::mpsc::channel-এর সিঙ্ক্রোনাস Receiver::recv মেথড একটি মেসেজ না পাওয়া পর্যন্ত ব্লক করে। trpl::Receiver::recv মেথডটি তা করে না, কারণ এটি অ্যাসিঙ্ক্রোনাস। ব্লক করার পরিবর্তে, এটি রানটাইমে কন্ট্রোল ফিরিয়ে দেয় যতক্ষণ না হয় একটি মেসেজ রিসিভ করা হয় বা চ্যানেলের সেন্ড সাইড বন্ধ হয়ে যায়। বিপরীতে, আমরা send কলের জন্য অপেক্ষা করি না, কারণ এটি ব্লক করে না। এটির প্রয়োজন নেই, কারণ আমরা যে চ্যানেলে এটি পাঠাচ্ছি সেটি আনবাউন্ডেড।

Note: যেহেতু এই সমস্ত অ্যাসিঙ্ক্রোনাস কোড একটি trpl::run কলের মধ্যে একটি অ্যাসিঙ্ক্রোনাস ব্লকে চলে, তাই এর ভেতরের সবকিছু ব্লক করা এড়াতে পারে। যাইহোক, এর বাইরের কোডটি run ফাংশন রিটার্ন করার উপর ব্লক করবে। trpl::run ফাংশনের মূল উদ্দেশ্য হল এটি: এটি আপনাকে বেছে নিতে দেয় কোথায় অ্যাসিঙ্ক্রোনাস কোডের কিছু সেটের উপর ব্লক করতে হবে এবং এইভাবে কোথায় সিঙ্ক্রোনাস এবং অ্যাসিঙ্ক্রোনাস কোডের মধ্যে ট্রানজিশন করতে হবে। বেশিরভাগ অ্যাসিঙ্ক্রোনাস রানটাইমে, run-এর নাম আসলে block_on রাখা হয়েছে ঠিক এই কারণেই।

এই উদাহরণ সম্পর্কে দুটি জিনিস লক্ষ্য করুন। প্রথমত, মেসেজটি অবিলম্বে আসবে। দ্বিতীয়ত, যদিও আমরা এখানে একটি ফিউচার ব্যবহার করি, এখনও কোনও কনকারেন্সি নেই। লিস্টিং-এর সবকিছু ক্রমানুসারে ঘটে, ঠিক যেমন কোনও ফিউচার জড়িত না থাকলে হত।

আসুন Listing 17-10-এ দেখানো মতো করে মেসেজের একটি সিরিজ পাঠিয়ে এবং তাদের মধ্যে স্লিপ করে প্রথম অংশটি সমাধান করি।

extern crate trpl; // required for mdbook test

use std::time::Duration;

fn main() {
    trpl::run(async {
        let (tx, mut rx) = trpl::channel();

        let vals = vec![
            String::from("hi"),
            String::from("from"),
            String::from("the"),
            String::from("future"),
        ];

        for val in vals {
            tx.send(val).unwrap();
            trpl::sleep(Duration::from_millis(500)).await;
        }

        while let Some(value) = rx.recv().await {
            println!("received '{value}'");
        }
    });
}

মেসেজ পাঠানোর পাশাপাশি, আমাদের সেগুলি গ্রহণ করতে হবে। এক্ষেত্রে, যেহেতু আমরা জানি কতগুলি মেসেজ আসছে, তাই আমরা rx.recv().await চারবার কল করে ম্যানুয়ালি এটি করতে পারি। বাস্তব জগতে, যদিও, আমরা সাধারণত অজানা সংখ্যক মেসেজের জন্য অপেক্ষা করব, তাই আমাদের অপেক্ষা করতে হবে যতক্ষণ না আমরা নির্ধারণ করি যে আর কোনও মেসেজ নেই।

Listing 16-10-এ, আমরা একটি সিঙ্ক্রোনাস চ্যানেল থেকে প্রাপ্ত সমস্ত আইটেম প্রসেস করতে একটি for লুপ ব্যবহার করেছি। Rust-এ এখনও একটি for লুপ লেখার কোনও উপায় নেই আইটেমগুলির একটি অ্যাসিঙ্ক্রোনাস সিরিজের উপর, যাইহোক, তাই আমাদের একটি লুপ ব্যবহার করতে হবে যা আমরা আগে দেখিনি: while let কন্ডিশনাল লুপ। এটি Concise Control Flow with if let and let else বিভাগে আমরা যে if let গঠন দেখেছি তার লুপ সংস্করণ। এটি যে প্যাটার্নটি নির্দিষ্ট করে তা মানের সাথে মিলতে থাকলে লুপটি এক্সিকিউট করা চালিয়ে যাবে।

rx.recv কলটি একটি ফিউচার তৈরি করে, যার জন্য আমরা অপেক্ষা করি। রানটাইম ফিউচারটি প্রস্তুত না হওয়া পর্যন্ত এটিকে থামিয়ে রাখবে। একবার একটি মেসেজ এলে, ফিউচারটি যতবার একটি মেসেজ আসবে ততবার Some(message)-এ রেজলভ করবে। যখন চ্যানেলটি বন্ধ হয়ে যায়, কোনও মেসেজ এসেছে কিনা তা নির্বিশেষে, ফিউচারটি পরিবর্তে None-এ রেজলভ করবে যাতে নির্দেশ করে যে আর কোনও মান নেই এবং তাই আমাদের পোলিং বন্ধ করা উচিত—অর্থাৎ, অপেক্ষা করা বন্ধ করা উচিত।

while let লুপ এই সমস্ত কিছুকে একত্রিত করে। rx.recv().await কলের ফলাফল যদি Some(message) হয়, তাহলে আমরা মেসেজটিতে অ্যাক্সেস পাব এবং আমরা এটিকে লুপ বডিতে ব্যবহার করতে পারি, ঠিক যেমনটি আমরা if let-এর সাথে করতে পারতাম। যদি ফলাফলটি None হয়, তাহলে লুপটি শেষ হয়। প্রতিবার লুপটি সম্পূর্ণ হলে, এটি আবার অ্যাওয়েট পয়েন্টে আঘাত করে, তাই রানটাইম এটিকে আবার থামিয়ে দেয় যতক্ষণ না অন্য কোনও মেসেজ আসে।

কোডটি এখন সফলভাবে সমস্ত মেসেজ পাঠায় এবং গ্রহণ করে। দুর্ভাগ্যবশত, এখনও কয়েকটি সমস্যা রয়েছে। একটির জন্য, মেসেজগুলি আধা-সেকেন্ডের ব্যবধানে আসে না। সেগুলি আমাদের প্রোগ্রাম শুরু করার 2 সেকেন্ড (2,000 মিলিসেকেন্ড) পরে একবারে আসে। অন্যটির জন্য, এই প্রোগ্রামটিও কখনও শেষ হয় না! পরিবর্তে, এটি নতুন মেসেজের জন্য চিরকাল অপেক্ষা করে। আপনাকে ctrl-c ব্যবহার করে এটি বন্ধ করতে হবে।

আসুন প্রথমে পরীক্ষা করে দেখি কেন মেসেজগুলি প্রতিটিটির মধ্যে বিলম্ব সহ আসার পরিবর্তে সম্পূর্ণ বিলম্বের পরে একবারে আসে। একটি প্রদত্ত অ্যাসিঙ্ক্রোনাস ব্লকের মধ্যে, কোডে await কীওয়ার্ডগুলি যে ক্রমে প্রদর্শিত হয় সেই ক্রমেই প্রোগ্রামটি চলার সময় সেগুলি এক্সিকিউট করা হয়।

Listing 17-10-এ শুধুমাত্র একটি অ্যাসিঙ্ক্রোনাস ব্লক রয়েছে, তাই এর সবকিছু লিনিয়ারভাবে চলে। এখনও কোনও কনকারেন্সি নেই। সমস্ত tx.send কলগুলি ঘটে, সমস্ত trpl::sleep কল এবং তাদের সম্পর্কিত অ্যাওয়েট পয়েন্টগুলির সাথে ইন্টারস্পার্সড। শুধুমাত্র তখনই while let লুপ recv কলের অ্যাওয়েট পয়েন্টগুলির মধ্য দিয়ে যেতে পারে।

আমরা যে আচরণটি চাই তা পেতে, যেখানে প্রতিটি মেসেজের মধ্যে স্লিপ বিলম্ব ঘটে, আমাদের tx এবং rx অপারেশনগুলিকে তাদের নিজস্ব অ্যাসিঙ্ক্রোনাস ব্লকে রাখতে হবে, যেমনটি Listing 17-11-এ দেখানো হয়েছে। তারপর রানটাইম trpl::join ব্যবহার করে তাদের প্রত্যেকটিকে আলাদাভাবে এক্সিকিউট করতে পারে, ঠিক গণনার উদাহরণের মতো। আবারও, আমরা trpl::join কলের ফলাফলের জন্য অপেক্ষা করি, individual ফিউচারগুলির জন্য নয়। আমরা যদি individual ফিউচারগুলির জন্য ক্রমানুসারে অপেক্ষা করতাম, তাহলে আমরা একটি সিকোয়েন্সিয়াল ফ্লো-তে ফিরে যেতাম—ঠিক যা আমরা করার চেষ্টা করছি না

extern crate trpl; // required for mdbook test

use std::time::Duration;

fn main() {
    trpl::run(async {
        let (tx, mut rx) = trpl::channel();

        let tx_fut = async {
            let vals = vec![
                String::from("hi"),
                String::from("from"),
                String::from("the"),
                String::from("future"),
            ];

            for val in vals {
                tx.send(val).unwrap();
                trpl::sleep(Duration::from_millis(500)).await;
            }
        };

        let rx_fut = async {
            while let Some(value) = rx.recv().await {
                println!("received '{value}'");
            }
        };

        trpl::join(tx_fut, rx_fut).await;
    });
}

Listing 17-11-এর আপডেটেড কোডের সাথে, মেসেজগুলি 2 সেকেন্ড পরে তাড়াহুড়ো করে আসার পরিবর্তে 500-মিলিসেকেন্ডের ব্যবধানে প্রিন্ট করা হয়।

প্রোগ্রামটি এখনও শেষ হয় না, যদিও, while let লুপ যেভাবে trpl::join-এর সাথে ইন্টারঅ্যাক্ট করে তার কারণে:

  • trpl::join থেকে রিটার্ন করা ফিউচার তখনই সম্পূর্ণ হয় যখন উভয় ফিউচার এটিকে পাস করা হয়েছে সম্পূর্ণ হয়।
  • tx ফিউচারটি vals-এ শেষ মেসেজ পাঠানোর পরে স্লিপ শেষ করার পরে সম্পূর্ণ হয়।
  • rx ফিউচারটি while let লুপ শেষ না হওয়া পর্যন্ত সম্পূর্ণ হবে না।
  • while let লুপটি শেষ হবে না যতক্ষণ না rx.recv-এর জন্য অপেক্ষা করলে None পাওয়া যায়।
  • rx.recv-এর জন্য অপেক্ষা করলে None রিটার্ন করবে শুধুমাত্র চ্যানেলের অন্য প্রান্তটি বন্ধ হয়ে গেলে।
  • চ্যানেলটি বন্ধ হবে শুধুমাত্র যদি আমরা rx.close কল করি বা যখন সেন্ডার সাইড, tx, ড্রপ করা হয়।
  • আমরা কোথাও rx.close কল করি না এবং trpl::run-এ পাস করা বাইরের অ্যাসিঙ্ক্রোনাস ব্লকটি শেষ না হওয়া পর্যন্ত tx ড্রপ করা হবে না।
  • ব্লকটি শেষ হতে পারে না কারণ এটি trpl::join সম্পূর্ণ হওয়ার উপর ব্লক করা, যা আমাদের এই তালিকার শীর্ষে ফিরিয়ে নিয়ে যায়।

আমরা কোথাও rx.close কল করে ম্যানুয়ালি rx বন্ধ করতে পারি, তবে এটি খুব বেশি অর্থবোধক নয়। কিছু arbitrary সংখ্যক মেসেজ হ্যান্ডেল করার পরে বন্ধ করা প্রোগ্রামটি বন্ধ করে দেবে, কিন্তু আমরা মেসেজ মিস করতে পারি। আমাদের অন্য কোনও উপায়ের প্রয়োজন যাতে tx ফাংশনের শেষের আগে ড্রপ করা হয় তা নিশ্চিত করা যায়।

এখন, যে অ্যাসিঙ্ক্রোনাস ব্লকে আমরা মেসেজ পাঠাই সেটি শুধুমাত্র tx ধার করে কারণ একটি মেসেজ পাঠানোর জন্য ownership-এর প্রয়োজন হয় না, কিন্তু আমরা যদি tx কে সেই অ্যাসিঙ্ক্রোনাস ব্লকে move করতে পারতাম, তাহলে সেই ব্লকটি শেষ হয়ে গেলে এটি ড্রপ করা হত। Chapter 13-এর Capturing References or Moving Ownership বিভাগে, আপনি শিখেছেন কিভাবে ক্লোজারের সাথে move কীওয়ার্ড ব্যবহার করতে হয় এবং Chapter 16-এর Using move Closures with Threads বিভাগে আলোচনা করা হয়েছে, থ্রেডের সাথে কাজ করার সময় আমাদের প্রায়শই ডেটা ক্লোজারে move করতে হয়। একই বেসিক ডায়নামিকগুলি অ্যাসিঙ্ক্রোনাস ব্লকের ক্ষেত্রে প্রযোজ্য, তাই move কীওয়ার্ডটি অ্যাসিঙ্ক্রোনাস ব্লকের সাথে একইভাবে কাজ করে যেমনটি ক্লোজারের সাথে করে।

Listing 17-12-এ, আমরা মেসেজ পাঠানোর জন্য ব্যবহৃত ব্লকটিকে async থেকে async move-এ পরিবর্তন করি। যখন আমরা কোডের এই সংস্করণটি চালাই, তখন শেষ মেসেজটি পাঠানো এবং গ্রহণ করার পরে এটি সুন্দরভাবে বন্ধ হয়ে যায়।

extern crate trpl; // required for mdbook test

use std::time::Duration;

fn main() {
    trpl::run(async {
        let (tx, mut rx) = trpl::channel();

        let tx_fut = async move {
            let vals = vec![
                String::from("hi"),
                String::from("from"),
                String::from("the"),
                String::from("future"),
            ];

            for val in vals {
                tx.send(val).unwrap();
                trpl::sleep(Duration::from_millis(500)).await;
            }
        };

        let rx_fut = async {
            while let Some(value) = rx.recv().await {
                println!("received '{value}'");
            }
        };

        trpl::join(tx_fut, rx_fut).await;
    });
}

এই অ্যাসিঙ্ক্রোনাস চ্যানেলটিও একটি মাল্টিপল-প্রডিউসার চ্যানেল, তাই আমরা একাধিক ফিউচার থেকে মেসেজ পাঠাতে চাইলে tx-এ clone কল করতে পারি, যেমনটি Listing 17-13-এ দেখানো হয়েছে।

extern crate trpl; // required for mdbook test

use std::time::Duration;

fn main() {
    trpl::run(async {
        let (tx, mut rx) = trpl::channel();

        let tx1 = tx.clone();
        let tx1_fut = async move {
            let vals = vec![
                String::from("hi"),
                String::from("from"),
                String::from("the"),
                String::from("future"),
            ];

            for val in vals {
                tx1.send(val).unwrap();
                trpl::sleep(Duration::from_millis(500)).await;
            }
        };

        let rx_fut = async {
            while let Some(value) = rx.recv().await {
                println!("received '{value}'");
            }
        };

        let tx_fut = async move {
            let vals = vec![
                String::from("more"),
                String::from("messages"),
                String::from("for"),
                String::from("you"),
            ];

            for val in vals {
                tx.send(val).unwrap();
                trpl::sleep(Duration::from_millis(1500)).await;
            }
        };

        trpl::join3(tx1_fut, tx_fut, rx_fut).await;
    });
}

প্রথমে, আমরা প্রথম অ্যাসিঙ্ক্রোনাস ব্লকের বাইরে tx ক্লোন করে tx1 তৈরি করি। আমরা tx1-কে সেই ব্লকে move করি ঠিক যেমনটি আমরা আগে tx-এর সাথে করেছি। তারপর, পরে, আমরা মূল tx-কে একটি নতুন অ্যাসিঙ্ক্রোনাস ব্লকে move করি, যেখানে আমরা সামান্য ধীর বিলম্বের সাথে আরও মেসেজ পাঠাই। আমরা এই নতুন অ্যাসিঙ্ক্রোনাস ব্লকটিকে মেসেজ গ্রহণ করার অ্যাসিঙ্ক্রোনাস ব্লকের পরে রাখি, তবে এটি এর আগেও যেতে পারত। মূল বিষয় হল ফিউচারগুলি যে ক্রমে অপেক্ষা করা হয়, সেগুলি যে ক্রমে তৈরি করা হয় তাতে নয়।

মেসেজ পাঠানোর জন্য উভয় অ্যাসিঙ্ক্রোনাস ব্লককেই async move ব্লক হতে হবে যাতে tx এবং tx1 উভয়ই সেই ব্লকগুলি শেষ হলে ড্রপ করা হয়। অন্যথায়, আমরা একই অসীম লুপে ফিরে যাব যেখানে আমরা শুরু করেছিলাম। অবশেষে, আমরা অতিরিক্ত ফিউচার হ্যান্ডেল করতে trpl::join থেকে trpl::join3-এ পরিবর্তন করি।

এখন আমরা উভয় সেন্ডিং ফিউচার থেকে সমস্ত মেসেজ দেখতে পাই এবং যেহেতু সেন্ডিং ফিউচারগুলি পাঠানোর পরে সামান্য ভিন্ন বিলম্ব ব্যবহার করে, তাই মেসেজগুলিও সেই ভিন্ন ব্যবধানে রিসিভ করা হয়।

received 'hi'
received 'more'
received 'from'
received 'the'
received 'messages'
received 'future'
received 'for'
received 'you'

এটি একটি ভাল শুরু, তবে এটি আমাদের শুধুমাত্র কয়েকটি ফিউচারের মধ্যে সীমাবদ্ধ করে: join-এর সাথে দুটি বা join3-এর সাথে তিনটি। আসুন দেখি কিভাবে আমরা আরও ফিউচারের সাথে কাজ করতে পারি।