TransWikia.com

numpy create array of the max of consecutive pairs in another array

Stack Overflow Asked by GalSuchetzky on November 27, 2020

I have a numpy array:

A = np.array([8, 2, 33, 4, 3, 6])

What I want is to create another array B where each element is the pairwise max of 2 consecutive pairs in A, so I get:

B = np.array([8, 33, 33, 4, 6])

Any ideas on how to implement?
Any ideas on how to implement this for more then 2 elements? (same thing but for consecutive n elements)

Edit:

The answers gave me a way to solve this question, but for the n-size window case, is there a more efficient way that does not require loops?

Edit2:

Turns out that the question is equivalent for asking how to perform 1d max-pooling of a list with a window of size n.
Does anyone know how to implement this efficiently?

7 Answers

A loop-free solution is to use max on the windows created by skimage.util.view_as_windows:

list(map(max, view_as_windows(A, (2,))))
[8, 33, 33, 4, 6]

Copy/pastable example:

import numpy as np
from skimage.util import view_as_windows

A = np.array([8, 2, 33, 4, 3, 6])

list(map(max, view_as_windows(A, (2,))))

Correct answer by Nicolas Gervais on November 27, 2020

Using Pandas:

A = pd.Series([8, 2, 33, 4, 3, 6])
res = pd.concat([A,A.shift(-1)],axis=1).max(axis=1,skipna=False).dropna()

>>res
0     8.0
1    33.0
2    33.0
3     4.0
4     6.0

Or using numpy:

np.vstack([A[1:],A[:-1]]).max(axis=0)

Answered by Binyamin Even on November 27, 2020

a recursive solution, for all of n

import numpy as np
import sys


def recursive(a: np.ndarray, n: int, b=None, level=2):
    if n <= 0 or n > len(a):
        raise ValueError(f'len(a):{len(a)} n:{n}')
    if n == 1:
        return a
    if len(a) == n:
        return np.max(a)
    b = np.maximum(a[:-1], a[1:]) if b is None else np.maximum(a[level - 1:], b)
    if n == level:
        return b
    return recursive(a, n, b[:-1], level + 1)


test_data = np.array([8, 2, 33, 4, 3, 6])
for test_n in range(1, len(test_data) + 2):
    try:
        print(recursive(test_data, n=test_n))
    except ValueError as e:
        sys.stderr.write(str(e))

output

[ 8  2 33  4  3  6]
[ 8 33 33  4  6]
[33 33 33  6]
[33 33 33]
[33 33]
33
len(a):6 n:7

about recursive function

You can observe the following data, and then you will know how to write the recursive function.

"""
np.array([8, 2, 33, 4, 3, 6])
n=2: (8, 2),     (2, 33),    (33, 4),    (4, 3),   (3, 6)  => [8, 33, 33, 4, 6] => B' = [8, 33, 33, 4]
n=3: (8, 2, 33), (2, 33, 4), (33, 4, 3), (4, 3, 6)         => B' [33, 4, 3, 6]  =>  np.maximum([8, 33, 33, 4], [33, 4, 3, 6]) => 33, 33, 33, 6
...
"""

Answered by Carson on November 27, 2020

Here is an approach specifically taylored for larger windows. It is O(1) in window size and O(n) in data size.

I've done a pure numpy and a pythran implementation.

How do we achieve O(1) in window size? We use a "sawtooth" trick: If w is the window width we group the data into lots of w and for each group we do the cumulative maximum from left to right and from right to left. The elements of any in-between window distribute over two groups and the maxima of the intersections are among the cumulative maxima we have computed earlier. So we need a total of 3 comparisons per data point.

benchit (thanks @Divakar) for w=100; my functions are pp (numpy) and winmax (pythran):

enter image description here

For small window size w=5 the picture is more even. Interestingly, pythran still has a huge edge even for very small sizes. They must be doing something right to mimimze call overhead.

enter image description here

python code:

cummax = np.maximum.accumulate
def pp(a,w):
    N = a.size//w
    if a.size-w+1 > N*w:
        out = np.empty(a.size-w+1,a.dtype)
        out[:-1] = cummax(a[w*N-1::-1].reshape(N,w),axis=1).ravel()[:w-a.size-1:-1]
        out[-1] = a[w*N:].max()
    else:
        out = cummax(a[w*N-1::-1].reshape(N,w),axis=1).ravel()[:w-a.size-2:-1]
    out[1:N*w-w+1] = np.maximum(out[1:N*w-w+1],
                            cummax(a[w:w*N].reshape(N-1,w),axis=1).ravel())
    out[N*w-w+1:] = np.maximum(out[N*w-w+1:],cummax(a[N*w:]))
    return out

pythran version; compile with pythran -O3 <filename.py>; this creates a compiled module which you can import:

import numpy as np

# pythran export winmax(float[:],int)
# pythran export winmax(int[:],int)

def winmax(data,winsz):
    N = data.size//winsz
    if N < 1:
        raise ValueError
    out = np.empty(data.size-winsz+1,data.dtype)
    nxt = winsz
    for j in range(winsz,data.size):
        if j == nxt:
            nxt += winsz
            out[j+1-winsz] = data[j]
        else:
            out[j+1-winsz] = out[j-winsz] if out[j-winsz]>data[j] else data[j]
    running = data[-winsz:N*winsz].max()
    nxt -= winsz << (nxt > data.size)
    for j in range(data.size-winsz,0,-1):
        if j == nxt:
            nxt -= winsz
            running = data[j-1]
        else:
            running = data[j] if data[j] > running else running
            out[j] = out[j] if out[j] > running else running
    out[0] = data[0] if data[0] > running else running
    return out

Answered by Paul Panzer on November 27, 2020

In this Q&A, we are basically asking for sliding max values. This has been explored before - Max in a sliding window in NumPy array. Since, we are looking to be efficient, we can look further. One of those would be numba and here are two final variants I ended up with that leverage parallel directive that boosts performance over a without version :

import numpy as np
from numba import njit, prange

@njit(parallel=True)
def numba1(a, W):
    L = len(a)-W+1
    out = np.empty(L, dtype=a.dtype)
    v = np.iinfo(a.dtype).min
    for i in prange(L):
        max1 = v
        for j in range(W):
            cur = a[i + j]
            if cur>max1:
                max1 = cur                
        out[i] = max1
    return out 

@njit(parallel=True)
def numba2(a, W):
    L = len(a)-W+1
    out = np.empty(L, dtype=a.dtype)
    for i in prange(L):
        for j in range(W):
            cur = a[i + j]
            if cur>out[i]:
                out[i] = cur                
    return out 

From the earlier linked Q&A, the equivalent SciPy version would be -

from scipy.ndimage.filters import maximum_filter1d

def scipy_max_filter1d(a, W):
    L = len(a)-W+1
    hW = W//2 # Half window size
    return maximum_filter1d(a,size=W)[hW:hW+L]

Benchmarking

Other posted working approaches for generic window arg :

from skimage.util import view_as_windows

def rolling(a, window):
    shape = (a.size - window + 1, window)
    strides = (a.itemsize, a.itemsize)
    return np.lib.stride_tricks.as_strided(a, shape=shape, strides=strides)

# @mathfux's soln
def npmax_strided(a,n):
    return np.max(rolling(a, n), axis=1)

# @Nicolas Gervais's soln
def mapmax_strided(a, W):
    return list(map(max, view_as_windows(a,W)))

cummax = np.maximum.accumulate
def pp(a,w):
    N = a.size//w
    if a.size-w+1 > N*w:
        out = np.empty(a.size-w+1,a.dtype)
        out[:-1] = cummax(a[w*N-1::-1].reshape(N,w),axis=1).ravel()[:w-a.size-1:-1]
        out[-1] = a[w*N:].max()
    else:
        out = cummax(a[w*N-1::-1].reshape(N,w),axis=1).ravel()[:w-a.size-2:-1]
    out[1:N*w-w+1] = np.maximum(out[1:N*w-w+1],
                            cummax(a[w:w*N].reshape(N-1,w),axis=1).ravel())
    out[N*w-w+1:] = np.maximum(out[N*w-w+1:],cummax(a[N*w:]))
    return out

Using benchit package (few benchmarking tools packaged together; disclaimer: I am its author) to benchmark proposed solutions.

import benchit
funcs = [mapmax_strided, npmax_strided, numba1, numba2, scipy_max_filter1d, pp]
in_ = {(n,W):(np.random.randint(0,100,n),W) for n in 10**np.arange(2,6) for W in [2, 10, 20, 50, 100]}
t = benchit.timings(funcs, in_, multivar=True, input_name=['Array-length', 'Window-length'])
t.plot(logx=True, sp_ncols=1, save='timings.png')

enter image description here

So, numba ones are great for window sizes lower than 10, at which there's no clear winner and on larger window sizes pp wins with SciPy one at second spot.

Answered by Divakar on November 27, 2020

In case there are consecutive n items, extended solution requires looping:

np.maximum(*[A[i:len(A)-n+i+1] for i in range(n)])

In order to avoid it you can use stride tricks and convert A to array of n-length blocks:

def rolling(a, window):
    shape = (a.size - window + 1, window)
    strides = (a.itemsize, a.itemsize)
    return np.lib.stride_tricks.as_strided(a, shape=shape, strides=strides)

For example:

>>> rolling(A, 3)
array([[ 8,  2,  8],
   [ 2,  8, 33],
   [ 8, 33, 33],
   [33, 33,  4]])

After it's done you can kill it with np.max(rolling(A, n), axis=1).

Though, despite its elegance, neither this solution nor first one were not efficient because we apply repeatedly maximum on adjacent blocks that differs by two items only.

Answered by mathfux on November 27, 2020

One solution to the pairwise problem is using the np.maximum function and array slicing:

B = np.maximum(A[:-1], A[1:])

Answered by GalSuchetzky on November 27, 2020

Add your own answers!

Ask a Question

Get help from others!

© 2024 TransWikia.com. All rights reserved. Sites we Love: PCI Database, UKBizDB, Menu Kuliner, Sharing RPP