import { MonoTypeOperatorFunction ,  Observable ,  ConnectableObservable } from 'rxjs';
import { finalize, publishReplay, refCount, shareReplay, take } from 'rxjs/operators';

/**
 * Pipes the given source Observable through publishReplay and connects to the resulting ConnectableObservable
 * to make the stream hot.
 * Note that the observable sequence will only complete, when the source observable completes.
 * @param unsubscribeWhen An optional observable that controls unsubscription from the underlying stream. When this stream emits an item,
 * completes or errors out, the shared subscription to the underlying stream is unsubscribed.
 */
export function connectedPublishReplay<T> (unsubscribeWhen?: Observable<any>): MonoTypeOperatorFunction<T> {
  return (source: Observable<T>) => {
    const publishReplayedSource$ = source.pipe(
      publishReplay(1)
    );
    const hotSubscription = (<ConnectableObservable<T>> publishReplayedSource$).connect();
    if (unsubscribeWhen) {
      unsubscribeWhen
        .pipe(
          take(1),
          finalize(() => hotSubscription.unsubscribe())
        ).subscribe();
    }
    return publishReplayedSource$;
  };
}

/**
 * Share the given source observable. Connects to the source observable when a subscriber is subscribed,
 * disconnects from the source observable, when the last subscriber disconnects.
 * See https://blog.strongbrew.io/share-replay-issue/
 */
export function lazyPublishReplay<T> (): MonoTypeOperatorFunction<T> {
  return (source: Observable<T>) => {
    return source.pipe(
      publishReplay(1),
      refCount()
    );
  };
}
