TransWikia.com

Multithreaded Client that sends data in a queue and stores data in another, while not blocking in Rust Tokio

Stack Overflow Asked by Ollegn on November 12, 2021

I’m having difficulties in making a Tokio client that receives packets from a server and stores them in a queue for the main thread to process, while being able to send packets to the server from another queue at the same time.

I’m trying to make a very simple online game demonstration, having a game client that Sends data (it’s own modified states, like player movement) and receives data (Game states modified by other players & server, like an NPC/other players that also moved).

The idea is to have a network thread that accesses two Arcs holding Mutexes to Vec<bytes::Bytes> that store serialized data. One Arc is for IncomingPackets, and the other for OutgoingPackets. IncomingPackets would be filled by packets sent from the server to the client that would be later read by the main thread, and OutgoingPackets would be filled by the main thread with packets that should be sent to the server.

I can’t seem to receive or send packets in another thread.

The client would only connect to the server, and the server would allow many clients (which would be served individually).

The explanations around stream’s usage and implementation are not newbie-friendly, but I think I should be using them somehow.

I wrote some code, but it does not work and is probably wrong.

(My original code does not compile, so treat this as pseudocode, sorry)

Playground

extern crate byteorder; // 1.3.4
extern crate futures; // 0.3.5
extern crate tokio; // 0.2.21 

use bytes::Bytes;
use futures::future;
use std::error::Error;
use std::sync::{Arc, Mutex};
use tokio::net::TcpStream;

use byteorder::{BigEndian, WriteBytesExt};
use std::io;
use std::time::Duration;
use tokio::io::AsyncReadExt;
use tokio::io::AsyncWriteExt;
use tokio::net::tcp::{ReadHalf, WriteHalf};

//This is the SharedPackets struct that is located in the crate structures
struct SharedPackets {
    data: Mutex<Vec<bytes::Bytes>>,
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
    let mut stream = TcpStream::connect("127.0.0.1:8080").await?;
    let (mut r, mut w) = stream.split();

    let mut inc: Vec<bytes::Bytes> = Vec::new();
    inc.push(Bytes::from("Wow"));

    let mut incoming_packets = Arc::new(SharedPackets {
        data: Mutex::new(inc),
    });

    let mut outg: Vec<bytes::Bytes> = Vec::new();
    outg.push(Bytes::from("Wow"));
    let mut outgoint_packets = Arc::new(SharedPackets {
        data: Mutex::new(outg),
    });

    let mut local_incoming_packets = Arc::clone(&incoming_packets);
    let mut local_outgoint_packets = Arc::clone(&outgoint_packets);
    let mut rarc = Arc::new(Mutex::new(r));
    let mut warc = Arc::new(Mutex::new(w));

    tokio::spawn(async move {
        //send and receive are both async functions that contain an infinite loop
        //they basically use AsyncWriteExt and AsyncReadExt to manipulate both halves of the stream
        //send reads the queue and write this data on the socket
        //recv reads the socket and write this data on the queue
        //both "queues" are manipulated by the main thread
        let mut read = &*rarc.lock().unwrap();
        let mut write = &*warc.lock().unwrap();

        future::try_join(
            send(&mut write, &mut local_outgoint_packets),
            recv(&mut read, &mut local_incoming_packets),
        )
        .await;
    });

    loop {
        //read & write other stuff on both incoming_packets & outgoint_packets
        //until the end of the program
    }
}

async fn recv(reader: &mut ReadHalf<'_>, queue: &mut Arc<SharedPackets>) -> Result<(), io::Error> {
    loop {
        let mut buf: Vec<u8> = vec![0; 4096];

        let n = match reader.read(&mut buf).await {
            Ok(n) if n == 0 => return Ok(()),
            Ok(n) => n,
            Err(e) => {
                eprintln!("failed to read from socket; err = {:?}", e);
                return Err(e);
            }
        };
    }
}

async fn send(writer: &mut WriteHalf<'_>, queue: &mut Arc<SharedPackets>) -> Result<(), io::Error> {
    loop {
        //task::sleep(Duration::from_millis(300)).await;
        {
            let a = vec!["AAAA"];
            for i in a.iter() {
                let mut byte_array = vec![];
                let str_bytes = i.as_bytes();
                WriteBytesExt::write_u32::<BigEndian>(&mut byte_array, str_bytes.len() as u32)
                    .unwrap();
                byte_array.extend(str_bytes);

                writer.write(&byte_array).await?;
            }
        }
    }
}

This does not compile:

error: future cannot be sent between threads safely
   --> src/main.rs:46:5
    |
46  |     tokio::spawn(async move {
    |     ^^^^^^^^^^^^ future created by async block is not `Send`
    | 
   ::: /playground/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-0.2.21/src/task/spawn.rs:127:21
    |
127 |         T: Future + Send + 'static,
    |                     ---- required by this bound in `tokio::spawn`
    |
    = help: within `impl futures::Future`, the trait `std::marker::Send` is not implemented for `std::sync::MutexGuard<'_, tokio::net::tcp::ReadHalf<'_>>`
note: future is not `Send` as this value is used across an await
   --> src/main.rs:55:9
    |
52  |           let mut read = &*rarc.lock().unwrap();
    |                            -------------------- has type `std::sync::MutexGuard<'_, tokio::net::tcp::ReadHalf<'_>>` which is not `Send`
...
55  | /         future::try_join(
56  | |             send(&mut write, &mut local_outgoint_packets),
57  | |             recv(&mut read, &mut local_incoming_packets),
58  | |         )
59  | |         .await;
    | |______________^ await occurs here, with `rarc.lock().unwrap()` maybe used later
60  |       });
    |       - `rarc.lock().unwrap()` is later dropped here
help: consider moving this into a `let` binding to create a shorter lived borrow
   --> src/main.rs:52:25
    |
52  |         let mut read = &*rarc.lock().unwrap();
    |                         ^^^^^^^^^^^^^^^^^^^^^

error: future cannot be sent between threads safely
   --> src/main.rs:46:5
    |
46  |     tokio::spawn(async move {
    |     ^^^^^^^^^^^^ future created by async block is not `Send`
    | 
   ::: /playground/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-0.2.21/src/task/spawn.rs:127:21
    |
127 |         T: Future + Send + 'static,
    |                     ---- required by this bound in `tokio::spawn`
    |
    = help: within `impl futures::Future`, the trait `std::marker::Send` is not implemented for `std::sync::MutexGuard<'_, tokio::net::tcp::WriteHalf<'_>>`
note: future is not `Send` as this value is used across an await
   --> src/main.rs:55:9
    |
53  |           let mut write = &*warc.lock().unwrap();
    |                             -------------------- has type `std::sync::MutexGuard<'_, tokio::net::tcp::WriteHalf<'_>>` which is not `Send`
54  | 
55  | /         future::try_join(
56  | |             send(&mut write, &mut local_outgoint_packets),
57  | |             recv(&mut read, &mut local_incoming_packets),
58  | |         )
59  | |         .await;
    | |______________^ await occurs here, with `warc.lock().unwrap()` maybe used later
60  |       });
    |       - `warc.lock().unwrap()` is later dropped here
help: consider moving this into a `let` binding to create a shorter lived borrow
   --> src/main.rs:53:26
    |
53  |         let mut write = &*warc.lock().unwrap();
    |                          ^^^^^^^^^^^^^^^^^^^^^

I think this is the least of the problems, because I’m really new with tokio.

I could not find an example of this, do you know any performant approach to this problem?

2 Answers

Why don't you use channels for sending/receiveing data from/to other tasks? There's a plenty of helpful examples here how to share data between tasks

EDIT: I looked at your code, noticed you're using wrong mutex. You should use tokio::sync::Mutex when dealing with async code. Secondly, there were issues with references in arc. I've moved creating arcs to spawned task and add cloning to send/reacv functions.

extern crate futures; // 0.3.5; // 0.1.36std;
extern crate tokio; // 0.2.21;
extern crate byteorder; // 1.3.4;


use std::{error::Error};
use std::sync::{Arc};
use tokio::sync::Mutex;
use tokio::net::TcpStream;
use futures::{future};
use bytes::Bytes;

use std::io;
use std::time::Duration;
use tokio::io::AsyncWriteExt;
use tokio::io::AsyncReadExt;
use tokio::net::tcp::{ReadHalf, WriteHalf};
use byteorder::{BigEndian, WriteBytesExt};




//This is the SharedPackets struct that is located in the crate structures
struct SharedPackets {
   data: Mutex<Vec<bytes::Bytes>>
}


#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
    
    let mut inc : Vec<bytes::Bytes> = Vec::new();
    inc.push(Bytes::from("Wow"));

    let mut incoming_packets = Arc::new(SharedPackets {
        data: Mutex::new(inc)
    });

    let mut outg : Vec<bytes::Bytes> = Vec::new();
    outg.push(Bytes::from("Wow"));
    let mut outgoint_packets = Arc::new(SharedPackets {
        data: Mutex::new(outg)
    });

    let mut local_incoming_packets = Arc::clone(&incoming_packets);
    let mut local_outgoint_packets = Arc::clone(&outgoint_packets);
   

    tokio::spawn(async move {
    let mut stream = TcpStream::connect("127.0.0.1:8080").await.unwrap();
    let (mut r, mut w) = stream.split();
    let mut rarc = Arc::new(Mutex::new(& mut r));
    let mut warc = Arc::new(Mutex::new(& mut w));
 

        //send and receive are both async functions that contain an infinite loop
        //they basically use AsyncWriteExt and AsyncReadExt to manipulate both halves of the stream
        //send reads the queue and write this data on the socket
        //recv reads the socket and write this data on the queue
        //both "queues" are manipulated by the main thread
        //let mut read = &*rarc.lock().await;
        //let mut write = &*warc.lock().await;

        future::try_join(send(warc.clone(), &mut local_outgoint_packets), recv(rarc.clone(), &mut local_incoming_packets)).await;
    });

   
    loop {
        //read & write other stuff on both incoming_packets & outgoint_packets
        //until the end of the program
    }
}


async fn recv(readerw: Arc<Mutex<&mut ReadHalf<'_>>>, queue: &mut Arc<SharedPackets>) -> Result<(), io::Error> {
let mut reader = readerw.lock().await;
loop {

    let mut buf : Vec<u8> = vec![0; 4096];

    let n = match reader.read(&mut buf).await {
        Ok(n) if n == 0 => return Ok(()),
        Ok(n) => n,
        Err(e) => {
            eprintln!("failed to read from socket; err = {:?}", e);
            return Err(e);
        }
    };                   
}
}



async fn send(writerw: Arc<Mutex<&mut WriteHalf<'_>>>, queue: &mut Arc<SharedPackets>) -> Result<(), io::Error> {
let mut writer = writerw.lock().await;
loop{
     //task::sleep(Duration::from_millis(300)).await;
     { 
         let a = vec!["AAAA"];
         for i in a.iter() {
            let mut byte_array = vec![];
            let str_bytes = i.as_bytes();
            WriteBytesExt::write_u32::<BigEndian>(&mut byte_array, str_bytes.len() as u32).unwrap();
            byte_array.extend(str_bytes);

            writer.write(&byte_array).await?;
         }
     }
}
}

Here is full code without errors, didn't test it though: Playground link

Answered by ktrapez on November 12, 2021

Here's an example that's a bit contrived, but it should help:

Playground link

use std::{sync::Arc, time::Duration};
use tokio::{self, net::TcpStream, sync::Mutex};

#[tokio::main]
async fn main() {
    let mut incoming_packets = Arc::new(Mutex::new(vec![b"Wow".to_vec()]));

    let mut local_incoming_packets = incoming_packets.clone();

    tokio::spawn(async move {
        for i in 0usize..10 {
            tokio::time::delay_for(Duration::from_millis(200)).await;
            let mut packets = local_incoming_packets.lock().await;

            packets.push(i.to_ne_bytes().to_vec());
        }
    });

    loop {
        tokio::time::delay_for(Duration::from_millis(200)).await;
        let packets = incoming_packets.lock().await;
        dbg!(packets);
    }
}

You can see that I have to clone outside of the async move block since that block takes ownership of everything inside it. I'm not sure about r and w but you might need to move those inside the block as well before you can pass mutable references to them. I can update my answer if you provide code that includes all of the proper use statements.

One thing that you need to remember is that main() can technically exit before the code that has been spawned.

Also, note that I used tokio::sync::Mutex so that you can yield while waiting to acquire the lock.

Answered by richardpringle on November 12, 2021

Add your own answers!

Ask a Question

Get help from others!

© 2024 TransWikia.com. All rights reserved. Sites we Love: PCI Database, UKBizDB, Menu Kuliner, Sharing RPP