TransWikia.com

RXJS Queue to accumulate data values every second and dispatch after every x seconds

Stack Overflow Asked by Chukwuma Nwaugha on December 16, 2021

I am trying to create a smart Queue that collects data every second or every time i call .next() and after every x secs, say x=5, it dispatches all the current items in the queue, while still receiving new items.

The whole idea is to poll the http request to the server every x=5 secs, so that instead of sending these items every sec, a batch of items are sent at a time.

The snippet below is what I have tried. I am accumulating the values with a scan operator, but I need a combination that takes from the queue every x=5 secs.

import { Subject, Subscription, Observable } from 'rxjs'
import { concatMap, mergeMap, map, delay, scan } from 'rxjs/operators'
import { Video, seconds } from '../types'

interface Queue {
  addMoment: (data: { t: seconds }) => void
  unsubscribe: () => void
}

type Moment = {
  t: seconds
}

const PER_BATCH = 5
const INTERVAL = PER_BATCH * 1000

export class MomentsQueue implements Queue {
  private queue = new Subject()
  private observer: Observable<any>
  private subscriber: Subscription | undefined

  constructor(private video: Video) {
    this.observer = this.queue.pipe(
      map((payload) => {
        console.log(payload)
        return payload as Moment
      }),
      scan((all: Moment[], current) => [...all, current], []),
      concatMap(
        (payload) =>
          new Promise((resolve) => {
            console.log({ payload })
            resolve(true)
          }),
      ),
    )
  }

  private subscribe() {
    this.subscriber = this.observer.subscribe()
    // ?? this.unsubscribe() // based on any chosen event
  }

  unsubscribe() {
    this.subscriber?.unsubscribe()
  }

  addMoment(data: Moment) {
    if (!this.subscriber || this.subscriber.closed) this.subscribe()

    this.queue.next({ t: data.t })
  }
}

export default MomentsQueue


One Answer

I was able to solve this using @TalOhania's help. See the solution in the code snippet below:

import { Subject, Subscription, Observable } from 'rxjs'
import { concatMap, map, bufferTime } from 'rxjs/operators'
import { Video, seconds } from '../types'

interface Queue {
  addMoment: (data: { t: seconds }) => void
  unsubscribe: () => void
}

type Moment = {
  t: seconds
}

const PER_BATCH = 5
const INTERVAL = PER_BATCH * 1000

export class MomentsQueue implements Queue {
  private queue = new Subject()
  private observer: Observable<any>
  private subscriber: Subscription | undefined

  constructor(private video: Video) {
    this.observer = this.queue.pipe(
      map((payload) => payload as Moment),
      bufferTime(INTERVAL),
      concatMap(
        (payload) =>
          new Promise((resolve) => {
            // send the payload to the server.
            console.log({ payload })
            resolve(true)
          }),
      ),
    )
  }

  private subscribe() {
    this.subscriber = this.observer.subscribe()
    // ?? this.unsubscribe() // based on any chosen event
  }

  unsubscribe() {
    this.subscriber?.unsubscribe()
  }

  addMoment(data: Moment) {
    if (!this.subscriber || this.subscriber.closed) this.subscribe()

    this.queue.next({ t: data.t })
  }
}

export default MomentsQueue

Answered by Chukwuma Nwaugha on December 16, 2021

Add your own answers!

Ask a Question

Get help from others!

© 2024 TransWikia.com. All rights reserved. Sites we Love: PCI Database, UKBizDB, Menu Kuliner, Sharing RPP