TransWikia.com

Combining imagery, masking and sampling in Descartes Labs Platform

Geographic Information Systems Asked by Madeline Lisaius on November 18, 2020

I am trying to sample data from Descartes Lab imagery to train a random forest classifier in the Descartes Plaform for an agricultural region. I have already written a script to prepare the imagery of interest (Sentinel-2, Sentinel-1 and Landsat 8) using the Descartes Labs cloud masks where applicable, and I have not figured out yet how to sample points. I see an example in the Descartes Labs platform that imports training data the example, but I would like to create my samples within the platform. How can I create a sample data set within the platform at scale?

First I import the packages of interest, identify my AOI (the central valley of California) and dates of interest:

# Import packages 
import descarteslabs as dl
import descarteslabs.workflows as wf
import numpy as np
from concurrent.futures import ThreadPoolExecutor
from shapely.geometry import shape, box
from random import random
from tqdm.notebook import tqdm

sac = shape(dl.places.shape(
    'north-america_united-states_california_sacramento-valley'
).geometry)

sj = shape(dl.places.shape(
    'north-america_united-states_california_san-joaquin-valley'
).geometry)


central_valley_aoi = sac.union(sj)

start_datetime = "2019-01-01"
end_datetime = "2019-03-28"

I then create tiles with my AOI:

tiles = dl.raster.dltiles_from_shape(resolution=10,
                                     tilesize=500,
                                     pad=0,
                                     shape=central_valley_aoi)

I then write out my functions to prep each Landsat 8, Sentinel-1 and Sentinel-2 imagery using .
Landsat 8:

l8_stack = (wf.ImageCollection.from_id('landsat:LC08:01:T1:TOAR', 
                               start_datetime=start_datetime, 
                               end_datetime=end_datetime)
            .pick_bands('red green blue nir swir1 derived:ndvi')
           )

l8_cloud_mask = (wf.ImageCollection.from_id('landsat:LC08:01:T1:TOAR:dlcloud:v1', 
                               start_datetime=start_datetime, 
                               end_datetime=end_datetime)
                .pick_bands('valid_cloudfree')
                )

l8_stack = l8_stack.concat_bands(l8_cloud_mask)

l8_masked = l8_stack.map(lambda img: img.mask(img.pick_bands('valid_cloudfree')==0))

l8_daily = (l8_masked
            .groupby(dates=('year', 'month', 'day'))
            .mosaic()
            .pick_bands('red green blue nir swir1 derived:ndvi')
)

Sentinel-2:

s2_stack = (wf.ImageCollection.from_id('sentinel-2:L1C', 
                               start_datetime=start_datetime, 
                               end_datetime=end_datetime)
            .pick_bands('red green blue red-edge nir swir1 derived:ndvi')
           )

s2_cloud_mask = (wf.ImageCollection.from_id('sentinel-2:L1C:dlcloud:v1', 
                               start_datetime=start_datetime, 
                               end_datetime=end_datetime)
                .pick_bands('valid_cloudfree')
                )

s2_stack = s2_stack.concat_bands(s2_cloud_mask)

s2_masked = s2_stack.map(lambda img: img.mask(img.pick_bands('valid_cloudfree')==0))

s2_daily = (s2_masked
            .groupby(dates=('year', 'month', 'day'))
            .mosaic()
            .pick_bands('red green blue red-edge nir swir1 derived:ndvi')
)

Sentinel-1:

s1_stack = (wf.ImageCollection.from_id('sentinel-1:GRD', 
                               start_datetime=start_datetime, 
                               end_datetime=end_datetime)
            .pick_bands('vh vv')
           )
s1_daily = (s1_stack
            .groupby(dates=('year', 'month', 'day'))
            .mosaic()
            .pick_bands('vh vv' )
)

These functions to prep each satellite imagery can be run easily with these simple lines and ideally will be called by tile, although I haven’t figured out how to integrate the Cropland Mask:

s2_data = s2_daily.compute(tile)
l8_data = l8_daily.compute(tile)
s1_data = s1_daily.compute(tile)

I then prep the cropland mask (where specific crops have been grown in California more than twice in the past for year and/or in 2019 to create a binary array:

cropland_2016, ctx = dl.scenes.search(tile_cord,
                    products='usda:cdl:v1',
                    start_datetime="2016-12-01",
                    end_datetime="2017-01-01",
                    limit=5
                   )

cropland_2017, ctx = dl.scenes.search(tile_cord,
                    products='usda:cdl:v1',
                    start_datetime="2017-12-01",
                    end_datetime="2018-01-01",
                    limit=5
                   )

cropland_2018, ctx = dl.scenes.search(tile_cord,
                    products='usda:cdl:v1',
                    start_datetime="2018-12-01",
                    end_datetime="2019-01-01",
                    limit=5
                   )

cropland_2019, ctx = dl.scenes.search(tile_cord,
                    products='usda:cdl:v1',
                    start_datetime="2019-12-01",
                    end_datetime="2020-01-01",
                    limit=5
                   )

#Arrays of Cropland Data Layer by year
cld_16 = cropland_2016[0].ndarray('class', ctx)
cld_17 = cropland_2017[0].ndarray('class', ctx)
cld_18 = cropland_2018[0].ndarray('class', ctx)
cld_19 = cropland_2019[0].ndarray('class', ctx)

#Cropland Data Layer Codes, by crop group
#https://www.nass.usda.gov/Research_and_Science/Cropland/metadata/metadata_ca19.htm

grains_oils_grass_beans = [1,2,3,4,5,6,10,11,12,13,21,22,23,24,25,26,27,28,29,
                           30,31,32,33,34,35,36,37,38,39,41,42,43,44,45,46,51,
                           52,53,225,226,228,230,232,234,235,236,237,238,239,240,241,254]

deli_crops = [14, 48, 49, 50, 54, 55, 57, 206, 207, 208, 209, 213, 214, 216, 
              219, 221, 222, 224, 227, 229, 231, 242, 243, 244, 245, 246, 247, 
              248, 249, 250]

tree_crops = [66, 67, 68, 69, 72, 74, 75, 76, 77, 204, 210, 211, 212, 215, 217,
              218,220, 223]

crops_list = deli_crops + tree_crops

# binary remapping of Cropland Data Layer to include only Delicate Crops
cld_16_deli = np.isin(cld_16, [crops_list]).astype(int)
cld_17_deli = np.isin(cld_17, [crops_list]).astype(int)
cld_18_deli = np.isin(cld_18, [crops_list]).astype(int)
cld_19_deli = np.isin(cld_19, [crops_list]).astype(int)

# weighting 2019 double
cld_19_deli2 = cld_19_deli*2

# combine all years of the Cropland Data Layer binary array 
four_year_combo = cld_16_deli + cld_17_deli + cld_18_deli + cld_19_deli2

# create binary array where 1 = cultivated for 2 years+ and/or in 2019, 0 = cultivated for only 1 year and not in 2019
four_year_binary = np.isin(four_year_combo, [2,3,4,5]).astype(int)

I would then like to call the imagery collection for each tile and mask all pixels using the corresponding cropland data layer mask. I have not finalized this piece.

# something like this? 
tile_data = {}
for k, tile in tqdm(enumerate(tiles['features'])):
    s2_data = s2_daily.compute(tile)
    l8_data = l8_daily.compute(tile) 
    s1_data = s1_daily.compute(tile)
    # stack all these bands
    # mask with cropland data layer to keep only areas that have grown desired crops
    }

At this point, once I have my stack of masked imagery, I would like to extract training data from the 2019 imagery using the 2019 Cropland Data Layer classification. How would I approach this in the Descartes Labs Platform at scale?

2 Answers

To run at scale, rather than calling .compute in a for-loop over each tile, you can submit all the tiles to the backend at once to run in parallel as Workflows Jobs, then asynchronously process them as they complete.

First though, you'll need to fix some edge cases with your cloud masking, and express your CDL crop masking in Workflows.

We'll walk through all of that below, but here's the same thing in notebook form, including interactive visualization and some other improvements which may be easier to follow.

Better cloud masking

l8_stack.concat_bands(l8_cloud_mask) assumes that l8_stack and l8_cloud_mask are the same length, which won't always be the case—some scenes might not have cloud masks processed. Here's a function to efficiently join the two, pick and mask only the scenes that have cloud masks, and construct the daily mosaics. It uses an ImageCollectionGroupby object (like a pandas groupby object) to efficiently lookup from the ImageCollections by date, and mosaic them at the same time:

def cloud_masked_daily_product(
    product_id: str, start_datetime: str, end_datetime: str
) -> wf.ImageCollection:
    "Get a product by ID, masked by the DL cloud mask and mosaicked by day"
    ic = wf.ImageCollection.from_id(product_id, start_datetime, end_datetime)
    cloudmask = (
        wf.ImageCollection.from_id(
            product_id + ":dlcloud:v1", start_datetime, end_datetime
        ).pick_bands("valid_cloudfree")
        == 0
    )

    # Make an ImageCollectionGroupby object, for quicker lookups
    # from `ic` by date (you can use it like a dict)
    ic_date_groupby = ic.groupby(dates=("year", "month", "day"))
    # For each cloudmask date, pick the corresponding image from `ic` by date, mosiac both, and mask them.
    # (Not all scenes have cloudmasks processed, so this ensures we only return scenes that do.)
    return cloudmask.groupby(dates=("year", "month", "day")).map(
        lambda ymd, mask_imgs: ic_date_groupby[ymd].mosaic().mask(mask_imgs.mosaic())
    )

We'll also write a quick NDVI function, rather than using the derived:ndvi band (which can be slower):

def ndvi(ic: wf.ImageCollection) -> wf.ImageCollection:
    nir, red = ic.unpack_bands("nir red")
    ndvi = (nir - red) / (nir + red)
    return ndvi.rename_bands("ndvi")

Putting these together, we can prepare our input imagery more concisely:

l8_daily = cloud_masked_daily_product(
    "landsat:LC08:01:T1:TOAR", start_datetime, end_datetime
).pick_bands("red green blue nir swir1")
l8_with_ndvi = l8_daily.concat_bands(ndvi(l8_daily))

s2_daily = cloud_masked_daily_product(
  "sentinel-2:L1C", start_datetime, end_datetime
).pick_bands("red green blue nir swir1")
s2_with_ndvi = s2_daily.concat_bands(ndvi(s2_daily))

s1 = wf.ImageCollection.from_id(
    "sentinel-1:GRD", start_datetime, end_datetime
).pick_bands("vh vv")
s1_daily = s1.groupby(dates=("year", "month", "day")).mosaic()

Incorporating CDL

Workflows doesn't have a built-in equivalent to np.isin, but we can write one. It's not quite as efficient, but the difference isn't noticeable.

def isin(ic: wf.ImageCollection, values: list) -> wf.ImageCollection:
    "Like np.isin, for Workflows"
    assert len(values) > 0
    result = False
    for value in values:
        result = result | (ic == value)
    return result

Using that, we'll replicate your CDL classification logic:

# picking all 4 years of CDL at once is more efficient
cdl = wf.ImageCollection.from_id(
    "usda:cdl:v1", start_datetime="2016-12-01", end_datetime="2020-01-01"
).pick_bands("class")

grains_oils_grass_beans = [1,2,3,4,5,6,10,11,12,13,21,22,23,24,25,26,27,28,29,
                        30,31,32,33,34,35,36,37,38,39,41,42,43,44,45,46,51,
                        52,53,225,226,228,230,232,234,235,236,237,238,239,240,241,254]

deli_crops = [14, 48, 49, 50, 54, 55, 57, 206, 207, 208, 209, 213, 214, 216,
            219, 221, 222, 224, 227, 229, 231, 242, 243, 244, 245, 246, 247,
            248, 249, 250]

tree_crops = [66, 67, 68, 69, 72, 74, 75, 76, 77, 204, 210, 211, 212, 215, 217,
            218,220, 223]

crops_list = deli_crops + tree_crops

is_crops = isin(cdl, crops_list)
is_crops_19 = is_crops[-1]

four_year_combo = is_crops.sum(axis="images") + is_crops_19  # double-weight 2019

# create binary array where 1 = cultivated for 2 years+ and/or in 2019,
# 0 = cultivated for only 1 year and not in 2019
four_year_binary = four_year_combo >= 2

# invert it to mask form, where True means masked
cdl_mask = ~four_year_binary

Then, we apply the cdl_mask to all of our training imagery:

l8_masked = l8_with_ndvi.mask(cdl_mask)
s2_masked = s2_with_ndvi.mask(cdl_mask)
s1_masked = s1_daily.mask(cdl_mask)

To fetch this data, you can compute all three ImageCollections at once, which is faster because it will happen in parallel, and the CDL mask will be reused for all three:

l8_data, s2_data, s1_data = wf.compute(
  [l8_masked.ndarray, s2_masked.ndarray, s1_masked.ndarray], tile
)

Now you have all of your input arrays, masked by CDL!

One thing to notice is that because the three satellites have different revisit rates, these three imagery stacks are different lengths. If your model requires having the same number of input scenes from each sensor, you'll need to add more logic to composite or filter the scenes down.

Also, I noticed that many of your tiles don't contain any of the crops you're looking for—they'd all be masked out. In the notebook I have code for filtering out these tiles ahead-of-time.

Running at scale

Submitting all the tiles at once will be much faster than calling .compute one-at-a-time, but also requires some boilerplate code.

We'll call .compute with block=False to immediately get a Job object, instead of waiting for the job to finish. Then, we'll write a function to check which Jobs are done, and process them in whatever order they finish.

# submit all the tiles to run, asynchronously
jobs = [
  wf.compute(
    [l8_masked.ndarray, s2_masked.ndarray, s1_masked.ndarray],
    tile,
    block=False
  )
  for tile in tqdm(tiles_to_run)
]

Then here's a generator function to iterate over the jobs as they complete:

from typing import Iterator, Sequence, Optional
import time

def as_completed(jobs: Sequence[wf.Job], interval_sec: Optional[int] = None) -> Iterator[wf.Job]:
    """
    Iterator over Jobs that yields each Job when it completes.
    
    Parameters
    ----------
    jobs: Sequence[wf.Job]
        Jobs to wait for
    interval_sec: Optional[int], default None
        Wait at least this many seconds between polling for job updates.
        If None (default), uses ``max(5, len(jobs) // 5)``.
        
    Yields
    ------
    job: wf.Job
        A completed job (either succeeded or failed).
    """
    jobs = list(jobs)
    if interval_sec is None:
        inverval_sec = max(5, len(jobs) // 5)
    while len(jobs) > 0:
        loop_start = time.perf_counter()

        i = 0
        while i < len(jobs):
            job = jobs[i]
            if not job.done:  # in case it's already loaded
                try:
                    job.refresh()
                except Exception:
                    continue  # be resilient to transient errors for now

            if job.done:
                yield job
                del jobs[i]  # "advances" i
            else:
                i += 1

        loop_duration = time.perf_counter() - loop_start
        if len(jobs) > 0 and loop_duration < interval_sec:
            time.sleep(interval_sec - loop_duration)

Finally, we'll use this function to iterate over the jobs list and handle each result:

failed = []
for job in as_completed(jobs):
    if job.error is not None:
        failed.append(job)
        print(job.error)
    else:
        l8_data, s2_data, s1_data = job.result(progress_bar=False)
        handle_result(l8_data, s2_data, s1_data)

Again, here is all of this in a working notebook, which also has examples of you'd visualize all the intermediate parts on an interactive map.

Correct answer by caitlin kontgis on November 18, 2020

In response to the comment regarding the error "failed with: code=ERROR_INVALID, message='Cannot access the ndarray attribute on an empty ImageCollection.' Do you know why this error would appear when we run this same script for 2020?":

The empty Image Collection error is due to an empty Landsat 8 dlcloud image collection for the specified time range. I would suggest using the native cloud mask to filter the Landsat 8 image collection, and continuing to use the dlcloud mask to filter the Sentinel-2 image collection. Use the function below for native cloud masking:

def native_masked_daily_product(
 product_id: str, start_datetime: str, end_datetime: str
) -> wf.ImageCollection:
 "Get a product by ID, masked by the native cloud mask and mosaicked by day"
 ic = wf.ImageCollection.from_id(product_id, start_datetime, end_datetime)
 cloudmask = ic.pick_bands("valid-cloudfree") == 0
 return ic.mask(cloudmask).groupby(dates=("year", "month", "day")).mosaic()

You can then substitute

l8_daily = masked_daily_product(
    "landsat:LC08:01:T1:TOAR", start_datetime, end_datetime
).pick_bands("red green blue nir swir1")

with 

l8_daily = native_masked_daily_product(
    "landsat:LC08:01:T1:TOAR", start_datetime, end_datetime
).pick_bands("red green blue nir swir1")

and this should fix the problem.

Answered by Rose Rustowicz on November 18, 2020

Add your own answers!

Ask a Question

Get help from others!

© 2024 TransWikia.com. All rights reserved. Sites we Love: PCI Database, UKBizDB, Menu Kuliner, Sharing RPP