TransWikia.com

Protecting read/write access to/from a file from multiple threads

Stack Overflow Asked by Molly Birk on December 20, 2021

This is a follow on from a previous question here – I received some wonderful advice that helped me move my code along. For the next piece of the puzzle, I figured it warranted a new post – I hope that’s okay.

I have some code that creates requests in a main loop, to read from or write to a file and executes each request in its own thread. With the help I got from the earlier post, I was able to extend my code to add a request queue with multiple entries and read/write functions that merely sleep for a short time to emulate file access.

Now I want to actually learn how to read and write to/from the files when there can potentially one or more threads trying to read and/or write the same file at the same time.

To make this easier to test, I limit the file to a single instance otherwise I need to consider the cases where the file doesn’t exist etc. In the real application, there will be several hundred files in play but my limited understanding suggests that if I can make the locking mechanism work for a single file, it’ll work when there are many.

I am still trying improve my understanding of threading and first tried adding an std::mutex with a global lock variable in the read_file() & write_file() functions but I got into a terrible mess.

Can someone please point me at the correct approach I should investigate to make this work in a robust fashion.

#include <fstream>
#include <future>
#include <iostream>
#include <sstream>
#include <iomanip>
#include <string>
#include <random>

std::vector< std::future<std::string> > requests;

int random_int(int start, int end)
{
    std::random_device rd;
    std::mt19937 generator(rd());
    std::uniform_int_distribution<> distrib(start, end);

    return distrib(generator);
}

const std::string generate_filename()
{
    std::ostringstream filename;

    // use a single file for testing
    //filename << "file_" << std::setfill('0') << std::setw(2) << random_int(1, 20) << ".txt";

    filename << "file.txt";

    return filename.str();
}

std::string write_file(const std::string filename)
{
    std::cout << "write_file: filename is " << filename << std::endl;

    // slow things down so i can follow
    std::this_thread::sleep_for(std::chrono::milliseconds(1000));

    std::ofstream ofs(filename);
    if (!ofs)
    {
        return std::string("ERROR");
    }

    const char chr = 'A' + random_int(0, 25);
    for (int i = 0; i < 64; ++i)
    {
        ofs << chr;
    }
    ofs << std::endl;
    ofs.close();

    std::cout << "write_file: written to " << filename << std::endl;

    return std::string("WRITTEN");
}

std::string read_file(const std::string filename)
{
    std::cout << "read_file: filename is " << filename << std::endl;

    // slow things down so i can follow
    std::this_thread::sleep_for(std::chrono::milliseconds(1000));

    std::ifstream ifs(filename);
    if (!ifs.is_open())
    {
        return std::string("ERROR OPEINING FILE");
    }

    std::string contents;
    if (std::getline(ifs, contents))
    {
        std::cout << "    read_file: read from " << filename << std::endl;
        return std::string(contents);
    }

    return std::string("ERROR READING CONTENTS");
}

void add_request()
{
    // randomly add a read or a write request
    if (random_int(1, 50) > 25)
        requests.push_back(std::async(std::launch::async, write_file, generate_filename()));
    else
        requests.push_back(std::async(std::launch::async, read_file, generate_filename()));
}

int main(int argc, char* argv[])
{
    int max_requests = 10;

    // avoid falling out of the loop on first pass
    add_request();

    do {
        std::cout << "working: requests in queue = " << requests.size() << std::endl;

        // randomly add a request if we still have not added the max
        if (random_int(1, 5) == 1)
        {
            if (--max_requests > 0)
            {
                add_request();
            }
        }

        // service the future for each item in the request queue
        for (auto iter = requests.begin(); iter != requests.end(); )
        {
            if ((*iter).wait_for(std::chrono::milliseconds(1)) == std::future_status::ready)
            {    
                std::cout << "Request completed, removing it from the queue: result: " << (*iter).get() << std::endl;
                iter = requests.erase(iter);
            }
            else
            {
                ++iter;
            }
        }

    // once the queue is empty we exit - in the real app, we do not 
    // and keep processing requests until the app exits normally
    } while (requests.size() > 0);
}

2 Answers

As an alternative to what David Schwartz suggests, instead of keeping shared state and using a std::mutex to guard it, you could instead make use of your operating system's ability to place locks on files. For example, on any UNIX-like operating system, you can use flock() to lock a file, either in shared mode (to allow multiple concurrent readers), or exclusive mode (for a single writer). This would even allow multiple instances of your program to run, accessing the same files without stepping on each others toes. The drawback is that it is not portable, and even on platforms that support it there is no way to get the UNIX file descriptor from a std::ifstream, so you would have to use the POSIX API to read and write files instead of functions from <iostream>. However, since the locks are advisory, you could first call POSIX open() on the file, lock it, and then create a std::ifstream or std::ofstream.

Another issue with multiple threads accessing the same file is that, even if you do proper locking, there is no guarantee in which order the threads are run. Perhaps it is better to not start all operations in parallel, but rather have a per-file queue of pending operations, and have only one thread per file processing these pending operations.

Answered by G. Sliepen on December 20, 2021

Here's the algorithm each thread should follow:

  1. Acquire the lock that protects the shared state of files.
  2. See if the file we are trying to access exists in the table.
  3. If it does not exist, create it.
  4. Check the entry for the file we are trying to access.
  5. If no other thread is accessing the file, jump to step 9.
  6. Release the lock that protects the shared state of files.
  7. Wait
  8. Go to step 1.
  9. Mark the file in use.
  10. Release the lock that protects the shared state of files.
  11. Read or write the file as appropriate.
  12. Acquire the lock that protects the shared state of files.
  13. Mark the file not in use.
  14. Release the lock that protects the shared state of files.

Note that if you use a condition variable to make the wait more efficient, then steps 6, 7 and 8 turn into waiting on the condition variable and then jumping to step 2. Also, you would need to broadcast the condition variable (notify all) before or after step 14. (Preferably before.)

Answered by David Schwartz on December 20, 2021

Add your own answers!

Ask a Question

Get help from others!

© 2024 TransWikia.com. All rights reserved. Sites we Love: PCI Database, UKBizDB, Menu Kuliner, Sharing RPP