import {
    defer,
    from,
    Observable,
    type ObservableInput,
    type OperatorFunction,
    Subject,
    type Subscriber,
    throwError,
    timer,
} from 'rxjs';
import {exhaustMap, finalize, mergeMap, retryWhen, throttle} from 'rxjs/operators';

/**
 * From:
 * - https://github.com/ReactiveX/rxjs/issues/1777
 * - https://github.com/ReactiveX/rxjs/issues/5004
 * - https://github.com/bjoerge/rxjs-exhaustmap-with-trailing
 */

/**
 * Like exhaustMap, but also includes the trailing value emitted from the source
 * observable while waiting for the preceding inner observable to complete.
 */
export const exhaustMapWithTrailing =
    <T, R>(project: (value: T, index: number) => ObservableInput<R>): OperatorFunction<T, R> =>
    (source): Observable<R> =>
        defer(() => {
            const release = new Subject<void>();
            return source.pipe(
                throttle(() => release, {
                    leading: true,
                    trailing: true,
                }),
                exhaustMap((value, index) =>
                    from(project(value, index)).pipe(
                        finalize(() => {
                            release.next();
                        }),
                    ),
                ),
            );
        });

export type InferObservable<T> = T extends Observable<infer U> ? U : never;

export type GenericRetryStrategyConfig = {
    maxRetryAttempts?: number;
    scalingDuration?: number;
};

// Taken from https://www.learnrxjs.io/learn-rxjs/operators/error_handling/retrywhen
export const genericRetryStrategy =
    ({maxRetryAttempts = 3, scalingDuration = 1000}: GenericRetryStrategyConfig = {}) =>
    (attempts: Observable<any>) => {
        return attempts.pipe(
            mergeMap((error, i) => {
                const retryAttempt = i + 1;
                // if maximum number of retries have been met throw error
                if (retryAttempt > maxRetryAttempts) {
                    return throwError(error);
                }
                // retry after 1s, 2s, etc...
                return timer(retryAttempt * scalingDuration);
            }),
        );
    };

export const promiseWithRetry = <T extends Promise<any>>(factory: () => T, config?: GenericRetryStrategyConfig) =>
    observableWithRetry$(
        // Use defer to call and create a new promise on each retry
        // Otherwise the old promise will be re-used
        defer(() => from(factory())),
        config,
    ).toPromise();

export const observableWithRetry$ = <T>(importPromise: Observable<T>, config?: GenericRetryStrategyConfig) => {
    return from(importPromise).pipe(retryWhen(genericRetryStrategy(config)));
};

/**
 * TAKEN FROM https://github.com/rxjs-ninja/rxjs-ninja/blob/main/libs/rxjs/utility/src/lib/from-readable-stream.ts
 *
 * Creates an Observable source from a ReadableStream source that will emit any
 * values emitted by the stream.
 *
 * @category Streams
 *
 * @see {@link https://stackblitz.com/edit/rxjs-readable-stream|StreamAPI Number Stream}
 * @see {@link https://stackblitz.com/edit/rxjs-readable-stream-fetch|Fetch + StreamAPI Demo}
 *
 * @param stream The ReadableStream to subscribe to
 * @param signal Optional {@link https://developer.mozilla.org/en-US/docs/Web/API/AbortSignal|AbortSignal} to provide
 *   to the underlying stream
 * @param queueStrategy Optional strategy for backpressure queueing
 * @param throwEndAsError Optional to return an error when the `AbortSignal` has been fired instead of just closing
 *
 * @example Create a ReadableStream of `0` to `100` and convert to an Observable
 * ```ts
 * const stream = new ReadableStream({
 *   start: (controller) => {
 *    for (let i = 0; i <100; i++) {
 *      controller.enqueue(i)
 *    }
 *    controller.close();
 *   }
 * });
 *
 * fromReadableStream(stream).pipe(reduce((a, b) => a + b)).subscribe();
 * ```
 * Output: `4950`
 *
 * @returns Observable that emits from a ReadableStream source
 */
export function fromReadableStream<T>(
    stream: ReadableStream<T>,
    signal?: AbortSignal,
    queueStrategy?: QueuingStrategy,
    throwEndAsError = false,
): Observable<T> {
    /**
     * @private
     * @internal
     * @param subscriber
     */
    function createStream(subscriber: Subscriber<T>) {
        return new WritableStream<T>(
            {
                write: value => subscriber.next(value),
                abort: error => {
                    if (throwEndAsError) {
                        subscriber.error(error);
                        /* istanbul ignore next-line */
                    } else if (!subscriber.closed) {
                        subscriber.complete();
                    }
                },
                close: () => {
                    /* istanbul ignore next-line */
                    if (!subscriber.closed) {
                        subscriber.complete();
                    }
                },
            },
            queueStrategy,
        );
    }

    return new Observable<T>(subscriber => {
        stream
            .pipeTo(createStream(subscriber), {signal})
            .then(() => {
                /* istanbul ignore next-line */
                return !subscriber.closed && subscriber.complete();
            })
            .catch(error => subscriber.error(error));

        return () => !stream.locked && stream.cancel();
    });
}
