TransWikia.com

Parallelized web crawler using goroutines and channels

Code Review Asked by Snowbody on January 10, 2021

As part of the "a tour of Go" section on golang.org, I was trying to make a (formerly singlethreaded) web crawler parallelized using goroutines. I got it working but it doesn’t seem to "flow" right; there’s a bunch of duplicated code. Looking for advice as to how it can seem a bit more Go-literate.

package main

import (
    "fmt"
    "sync"
)

type Fetcher interface {
    // Fetch returns the body of URL and
    // a slice of URLs found on that page.
    Fetch(url string) (body string, urls []string, err error)
}

type safeMap = struct {
    seen map[string]bool
    mu sync.Mutex
    remaining int
}

// Crawl uses fetcher to recursively crawl
// pages starting with url, to a maximum of depth.

func gci(url string, depth int, fetcher Fetcher, m *safeMap, fc chan string) {
    go crawl_int(url, depth, fetcher, m, fc)
}

func crawl_int(url string, depth int, fetcher Fetcher, m* safeMap, fc chan string) {
    if depth > 0 {
        body, urls, err := fetcher.Fetch(url)
        //fc <- fmt.Sprintf("Crawling %sn", url)
        m.mu.Lock()
        defer m.mu.Unlock()

        if err != nil {
            fc <- fmt.Sprintf("%vn",err)
        } else {
            fc <- fmt.Sprintf("found: %s %q %dn", url, body, len(urls))
            for _, u := range urls {
                _, found := m.seen[u]
                if !found {
                    m.remaining += 1
                    m.seen[u] = true
                    defer gci(u, depth-1, fetcher, m, fc)
                }
            }
        }
    } else {
        m.mu.Lock()
        defer m.mu.Unlock()
    }       
    m.remaining -= 1
    //fc <- fmt.Sprintf("finished %s remaining to %dn", url, m.remaining)
    if (m.remaining == 0) {
        //fc <- fmt.Sprintf("closing")
        close(fc)
    }
}   

func Crawl(url string, depth int, fetcher Fetcher, ch chan string) {
    // Fetches URLs in parallel.
    // Doesn't fetch the same URL twice.
    c := safeMap{seen: make(map[string]bool), remaining: 1}

    go crawl_int(url, depth, fetcher, &c, ch)
}

func main() {
    ch := make(chan string,5)
    Crawl("https://golang.org/", 4, fetcher, ch)
    for u := range ch {
        fmt.Print(u)
    }
}

// fakeFetcher is Fetcher that returns canned results.
type fakeFetcher map[string]*fakeResult

type fakeResult struct {
    body string
    urls []string
}

func (f fakeFetcher) Fetch(url string) (string, []string, error) {
    if res, ok := f[url]; ok {
        return res.body, res.urls, nil
    }
    return "", nil, fmt.Errorf("not found: %s", url)
}

// fetcher is a populated fakeFetcher.
var fetcher = fakeFetcher{
    "https://golang.org/": &fakeResult{
        "The Go Programming Language",
        []string{
            "https://golang.org/pkg/",
            "https://golang.org/cmd/",
        },
    },
    "https://golang.org/pkg/": &fakeResult{
        "Packages",
        []string{
            "https://golang.org/",
            "https://golang.org/cmd/",
            "https://golang.org/pkg/fmt/",
            "https://golang.org/pkg/os/",
        },
    },
    "https://golang.org/pkg/fmt/": &fakeResult{
        "Package fmt",
        []string{
            "https://golang.org/",
            "https://golang.org/pkg/",
        },
    },
    "https://golang.org/pkg/os/": &fakeResult{
        "Package os",
        []string{
            "https://golang.org/",
            "https://golang.org/pkg/",
        },
    },
}

One Answer

OK, first off: back when go was first released, a lot of people criticised it for being "too opinionated". The coding style was/is very specific (indentation, brackets, etc... are all standardised). This has turned out to be a great thing. Code written by anyone, provided they've used gofmt looks very similar. This resulted in code being easy to read regardless of who wrote it. In addition to the stuff gofmt takes care of, I would strongly recommend you look at Golang Code Review comments and adopt the recommendations listed there. Function names like crawl_init should be camelCased (crawlInit). In fact, the init part should probably be omitted. Acronyms like gci (which I assume is short for "go crawl init") should be capitalised. I'll go through your code and change the names to their more idiomatic counterparts.

General comments

There's a couple of issues here. Your crawl_init function is closing the channel (fc). This is not where your channel should be closed. The rule of thumb is that channels are closed in the same scope as where they're created. Exceptions to this rule are not uncommon (e.g. closures), but 95% of the time, you'll close a channel either on the same level as where you create it, or in a function that has access to the scope creating the channel. Because you're recursively crawling URLs, it's more than likely a routine will end up reaching the point where it thinks it can close the channel, while another routine is still trying to write to it. This results in a data race, and a runtime panic. Something you'll want to avoid at all cost.

Another comment I'd have is that you're keeping track of which URL's you've already checked using a map (map[string]bool). You quite rightly use a mutex to avoid concurrent access, but you're releasing the locks you acquire only if your function/routine returns (owing to your use of defer). The result is that you can spawn countless of routines, and you could even increase the channel buffer size to an insane number, but all routines will just wait until the lock is released, and execute sequentially. I suspect this is the reason why you created this gci function, and are invoking it in your defer stack, rather than just spawning the routine in place.
Something worth noting here, too, is that bool is a type that takes up space. Not much, but it takes up memory. The language specification states that an empty struct (like s := struct{}{}) is a 0-byte value. If you need a map that keeps track of values you've already seen, then the keys are the only data that really matters. As such, using map[string]struct{} is a more efficient type.

You're also initialising the map using make(map[string]bool). There are times where you want to use make: e.g. cases where you know how big the map will need to be, and you want to cut down on the number of times the runtime will have to allocate additional memory. In those situations, you'll write something like cpy := make(map[string]struct{}, len(src)). When you don't know/care about allocation cycles, it's shorter (and more common) to just use a literal: seen: map[string]struct{}{}.

In your comments, I couldn't help notice the use of the word "parallel". It may seem like a distinction without a difference, but golang deals in concurrency, not parallelism. Some routines may actually run in separate threads, but that's not necessarily the case.

With all this out of the way, let's get down to the actual code

The code

I'll just go through the code you have, and rework it + provide some clarification as to why I'm doing so. I've omitted the fetcher interface as it's not that pertinent to the code you've shared here

package main

import (
    "context" // added this
    "fmt"
    "sync"
)

// the = sign is invalid syntax, I assume that was a typo
// type safeMap = struct {
type pathMap struct {
    URLs *sync.Map // use the sync.Map type, safe for concurrent use, and has some useful features for us
    wg   *sync.WaitGroup // your remaining field is just tracking how many routines are running, aka a waitgroup
}

Right, so this type has changed a lot. I've swapped out the mutex + map for a sync.Map. This type is designed to be safe for concurrent access, and has a nifty function: LoadOrStore. It will make it easy for us to check if we've seen a URL before, and if not, we atomically add it to the list, and can spin up a routine to crawl said URL.

The remaining counter is just a manual way of tracking whether or not we've finished checking all the links, to then close the channel. Because we've gotten rid of the mutex, we would now be concurrently accessing this field. Something we don't want to do. Instead of faffing around with the atomic package, we can simply use the purpose-built WaitGroup type to achieve the exact same thing. Well, almost the same thing. The added bonus here is that we can outsource/delegate waiting for the work to be done to our caller, who can then close the channel, safe in the knowledge that all the work is done (and thus that no routine will be writing to the channel).

So let's start crawling (logically, that means taking a look at the Crawl function):

// StartCrawl short description in this format, which automates the godoc generated documentation
// we're returning the pageMap here, back to main so it can check for the waitgroup status
// I moved Fetcher to the front, because it's kind of the critical "thing"
// I've also changed the channel argument to a directional one.
// this ensures nobody mistakenly adds code reading from this channel later on, and shows intent
// I've also added a context argument, so you can cleanly exit on receiving a kill signal
func StartCrawl(ctx context.Context, fetcher Fetcher, URL string, depth int, ch chan<- string) *pageMap {
    c := &pageMap{ // assign pointer
        URLs: &sync.Map{},
        wg:   &sync.WaitGroup{},
    }
    // start crawling
    c.wg.Add(1) // adding 1 routine to the WaitGroup

    // passing on context, fetcher, pageMap, etc...
    // renamed crawl_init to crawl, this is basically the "init" function, after all
    go crawl(ctx, fetcher, c, URL, depth, ch)
    return c
}

These changes are pretty self-explanatory, although this entire function simply passes through the arguments you already had, and now returns the pageMap back to the caller. This function is, clearly, extraneous, so we could move this all up to our main function. We'll do that at the end. If you were to try and compile this, you'll get an error where we're passing the channel. This StartCrawl function takes a chan<- string argument (a write-only channel). the crawl function still expects a bi-directional channel (chan string). This isn't allowed, obviously. Because we want to make sure this channel isn't being read from, we've restricted it, and so we have to change the functions further down the call-chain, too. Let's do that now:

// func gci(){} is removed, it's a single call, wrapped in a function. this is inefficient, and silly

// change arguments to match our previous changes
func crawl(ctx context.Context, fetcher Fetcher, m *pageMap, URL string, depth int, ch chan<- string) {
    defer m.wg.Done() // when this routine returns, update waitgroup

    body, URLs, err := fetcher.Fetch(URL)
    if err != nil {
        ch <- fmt.Sprintf("Error fetching %s: %+v", URL, err) // add some more info
        return // we don't need to continue doing this here. get rid of the else
    }

    ch <- fmt.Sprintf("URL %s - %qn found %d URLs", URL, body, len(URLs))
    // have we reached the "deepest" level?
    if depth <= 1 {
        return // we have, exit the routine
    }

    // we still have more levels to go
    for _, URL := range URLs {
        // LoadOrStore returns true if the key (URL) already existed, if not it's added to the map
        if _, ok := m.URLs.LoadOrStore(URL, nil); !ok {
            // we have a new URL
            m.wg.Add(1) // we need to crawl this new page, so add new routine to the waitgroup
            // spin up the new routine, depth -1
            go crawl(ctx, fetcher, m, URL, depth-1, ch)
        }
    }
}

So what we have here is a function that doesn't have to faff around with a mutex, doesn't have to use defer to spawn more routines, and doesn't contain a single else. We can just end our routine if an error occurs, or if we've reached the end of the depth/URLs we need to scrape. To my eye, at least, the code instantly looks a lot cleaner. Most importantly, though: we're not closing the channel in the routine that writes to it. Remember: channels can be shared between many routines (as indeed this channel is). To determine whether or not a channel is safe to be closed in one place, without it being aware of the existence of any of the other routines is tricky at best. So how do we know when to close this channel, and how can we wait for that moment, while still printing the output?

That's quite easily done if we change our main function:

func main() {
    ctx, cfunc := context.WithCancel(context.Background())
    defer cfunc()
    done := make(chan struct{})
    ch := make(chan string, 5)
    // start waiting for values to print
    go func() {
        for s := range ch {
            fmt.Println(s)
        }
        close(done) // we're done printing
    }()
    crawler := StartCrawl(ctx, fetcher, "https://golang.org/", 4, ch)
    crawler.wg.Done() // wait for the waitgroup to hit 0, indicating the work is done
    close(ch)   // close the channel now that the work is done
    <-done      // wait for printing to be done
    // all done, return from main
}

So that's it, then. We open and close the string channel in our main routine, read and print the data on our string channel in a dedicated routine, and use a simple 0-byte, non-buffered channel to force our main function to wait until everything is done so we can safely exit our program.

You may have noticed that this done channel is created in the scope of main, but closed in a routine, despite my having just said this isn't always a good idea. The difference here is that the channel never leaves the scope of main, and is closed in a closure that has full access to the main scope. It's pretty obvious that, unless you start to pass around this very done channel around to countless other routines, this is perfectly safe.

What's next

So I've added this context argument to your code, and at the moment, it's just sitting there, doing nothing but forcing us to type more stuff. That's a bit annoying, for sure. Why, then, would we bother adding it?

Put simply, if you run an application with many routines, it's reasonable to assume execution time can be variable, and sometimes it can take quite long for the application to finish doing whatever it's doing. Sometimes you (or your OS) may want to send a KILL or TERM signal to the process. In such an event, we want to be able to cleanly exit all the same: close the channels, stop doing what we're doing, etc... This is actually a very easy thing to do by cancelling the context. The context object basically can be used to store information about the context in which your application (or a subset thereof) runs. If the main process receives a kill signal, all routines should be notified, and should stop doing what they're doing. This is what that looks like in our code:

import(
    "context"
    "fmt"
    "os"
    "os/signal"
    "sync"
)

func main() {
    ctx, cfunc := context.WithCancel(context.Background())
    defer cfunc() // this ALWAYS needs to be called, multiple calls are fine
    sig := make(chan os.Signal, 1)
    // register channel so we get notified of kill/interrupt signals
    signal.Notify(sig, signal.Kill, signal.Interrupt)
    go func() {
        defer close(sig)
        select {
        case <-sig: // we've received a signal
            cfunc() // cancel the context
            return
        case <-ctx.Done(): // app terminated normally
            return
        }
    }()

    // this is the same as before
    done := make(chan struct{})
    ch := make(chan string, 5)
    go func() {
        for s := range ch {
            fmt.Println(s)
        }
        close(done)
    }()
    // no need for StartCrawl, let's just create it all in main
    crawler := &pageMap{
        URLs: &sync.Map{},
        wg:   &sync.WaitGroup{},
    }

    crawler.wg.Add(1)

    go crawl(ctx, fetcher, crawler, URL, depth, ch)

    crawler.wg.Done()
    close(ch)
    <-done
}

So we've gotten rid of the StartCrawl function, and added some code to set our program up to register signals (interrupt and kill) from our OS. When we get such a signal, the context is cancelled. Obviously, our crawl routine doesn't know about that just yet, so we have to make a few adjustments to the code there:

// write to channel, returns false if context is cancelled
func writeChan(ctx context.Context, s string, ch chan<- string) bool {
    // wait until either the context is cancelled, or we can write to the channel
    select {
    case <-ctx.Done():
        return false
    case ch <-s:
        return true
    }
}

func crawl(ctx context.Context, fetcher Fetcher, m *pageMap, URL string, depth int, ch chan<- string) {
    defer m.wg.Done() // when this routine returns, update waitgroup

    body, URLs, err := fetcher.Fetch(URL)
    if err != nil {
        _ = writeChan(ctx, fmt.Sprintf("Error fetching %s: %+v", URL, err), ch)
        return
    }

    // depth reached, or context cancelled -> return
    if ok := writeChan(ctx, fmt.Sprintf("URL %s - %qn found %d URLs", URL, body, len(URLs)), ch); !ok || depth <= 1 {
        return
    }

    for _, URL := range URLs {
        if _, ok := m.URLs.LoadOrStore(URL, nil); !ok {
            select {
            case <-ctx.Done():
                return // context cancelled
            default:   // carry on as usual
                m.wg.Add(1)
                go crawl(ctx, fetcher, m, URL, depth-1, ch)
            }
        }
    }
}

Now in real life, whatever this Fetcher is you're using, it's most likely going to be using an net/http.Client under the bonnet. You would pass the context to this client, or include that context in the request you're sending. Internally, the context will abort the request if the context is cancelled, so everything returns as early as possible. In that case, I wouldn't bother adding the select checking if the context is done before spawning the new routine, but I thought I'd include it here just for completeness sake.

Disclaimer

I've gone through your code a couple of times, but it's possible I didn't catch everything (the fetcher part, I've sort of ignored). All the code I wrote as part of my review was written off-the-cuff, with no testing, so it may contain bugs, typos, omissions, etc... The key thing here was to explain how I'd write this type of code, why, and provide some code snippets to serve as a decent starting point/illustration of what the code eventually would look like.

Have fun.

Answered by Elias Van Ootegem on January 10, 2021

Add your own answers!

Ask a Question

Get help from others!

© 2024 TransWikia.com. All rights reserved. Sites we Love: PCI Database, UKBizDB, Menu Kuliner, Sharing RPP