import {defer, MonoTypeOperatorFunction, ReplaySubject} from 'rxjs';

import {takeUntilDestroyed} from './take-until-destroyed';

/**
 * Operator that caches the source observable. The `cacheSize` parameter can be used to limit the
 * amount of emitted items to store in the cache.
 *
 * This resulting observable is only partially linked to the source observable. The source
 * observable will only be subscribed to once the resulting observable is subscribed to at least
 * once, but the source observable is not unsubscribed from once the resulting observable loses
 * all subscriptions. To prevent memory leaks, a `destroyable` must be passed. Once the destroyable
 * is destroyed, the source observable is unsubscribed from and the resulting observable completes.
 *
 * If you want to unsubscribe from the source observable once all subscriptions to the resulting
 * observable are unsubscribed, please take a look at RxJs's `shareReplay` operator.
 *
 * Example usage to cache the result of a connector:
 *
 * ```ts
 * // in a service
 *
 * // Subscribing to this observable for the first time will launch the actual request
 * public readonly foo$: Observable<Foo> = connector.prepare(call).pipe(
 *   retry(2),
 *   cache(this),
 * );
 * ```
 *
 * @param destroyable Destroyable that will end the subscription on the source once destroyed
 * @param cacheSize The maximum number of entries to keep in the cache.
 * @see http://reactivex.io/documentation/operators/replay.html
 */
export function cache<T>(destroyable: any, cacheSize?: number): MonoTypeOperatorFunction<T> {
  // Create the takeUntil operator instance here to ensure the browser can drop the reference to the
  // `destroyable` parameter, otherwise we might create a circular reference if someone stores a
  // result of this cache function.
  const takeUntil = takeUntilDestroyed<T>(destroyable);

  return source => {
    const target = new ReplaySubject<T>(cacheSize);
    const targetObservable = target.asObservable();

    let subscribed = false;

    return defer(() => {
      if (!subscribed) {
        subscribed = true;
        source.pipe(takeUntil).subscribe(target);
      }

      return targetObservable;
    });
  };
}
