import { Observable, Subject, switchMap, timer } from 'rxjs';
import { tap } from 'rxjs/operators';
import {
  WebsocketAnyNotificationMessage,
  WebsocketNotificationInterface,
  WebsocketProgressInterface,
} from '../models/websocket.model';

type SourceType = WebsocketNotificationInterface<WebsocketProgressInterface>;
type ProgressToasts = {
  PROGRESS: (
    groupId: string,
    percent: number,
    counter?: {
      processing: number;
      all: number;
      errors: WebsocketAnyNotificationMessage;
    },
  ) => void;
  SUCCESS: (groupId: string) => void;
  ERROR: (groupId: string) => void;
};

const UNKNOWN_ACTION = 'UNKNOWN_ACTION';

const streamCollector: Record<string, StreamHandler> = {};

class StreamHandler {
  total = 0;
  processed = 0;
  notProcessed = 0;
  uuids = new Set<string>();

  counters: { [key in string]: number } = {};

  #timeoutAt$ = new Subject<number>();
  #timeoutAtSubscription = this.#timeoutAt$.pipe(switchMap((milliseconds) => timer(milliseconds))).subscribe(() => {
    this.toasts && this.toasts.SUCCESS(this.id);
    this.#destroy();
  });

  #nextState$ = new Subject<void>();
  #stateSubscription = this.#nextState$.subscribe({
    next: () => {
      if (this.toasts) {
        this.toasts.PROGRESS(this.id, this.percent, {
          all: this.total,
          processing: this.processed,
          errors: this.counters,
        });
      }

      this.#timeoutAt$.next(this.totalReceived === 0 ? 50_000 : 15_000);
    },
    complete: () => {
      this.toasts && this.toasts.SUCCESS(this.id);
      this.finishAction && this.finishAction();
      this.#destroy();
    },
    error: () => {
      this.toasts && this.toasts.ERROR(this.id);
      this.#destroy();
    },
  });

  constructor(private id: string, private toasts?: ProgressToasts, private finishAction?: () => void) {
    streamCollector[id] = this;
  }

  addProcessed() {
    this.processed = this.processed + 1;
  }

  addNotProcessed() {
    this.notProcessed = this.notProcessed + 1;
  }

  addError(error: WebsocketNotificationInterface<any>['data']['message']) {
    this.counters[error] = this.counters[error] ?? 0;
    this.counters[error] += 1;
  }

  get totalReceived() {
    return this.notProcessed + this.processed;
  }

  get percent() {
    if (!this.total) {
      return 100;
    }

    return Math.ceil((this.totalReceived / this.total) * 100);
  }

  isComplete() {
    return this.totalReceived === this.total && this.totalReceived !== this.notProcessed;
  }

  messageReceived(msg: WebsocketNotificationInterface<WebsocketProgressInterface>) {
    const processed = msg.data?.processed ?? 0;
    const notProcessed = msg.data?.not_processed ?? 0;
    const total = msg.data?.total ?? 0;
    const message = msg.data?.message;

    processed && this.addProcessed();
    notProcessed && this.addNotProcessed();
    message && this.addError(message);

    if (msg?.uuid && !this.uuids.has(msg.uuid)) {
      this.uuids.add(msg.uuid);
      this.total = this.total + (msg.data?.total ?? 0);
    }

    if (!this.isComplete() && processed + notProcessed === total) {
      this.processed = processed;
      this.notProcessed = notProcessed;
      this.total = total;
    }

    if (this.isComplete()) {
      this.#nextState$.complete();
    }

    this.#nextState$.next();
  }

  #destroy() {
    this.#stateSubscription.unsubscribe();
    this.#timeoutAtSubscription.unsubscribe();
    delete streamCollector[this.id];
  }
}

export function WsProgressToast(
  toasts: ProgressToasts,
  finishAction?: () => void,
): (source$: Observable<SourceType>) => Observable<SourceType> {
  return (source$) =>
    source$.pipe(
      tap((msg) => {
        const id = (msg?.data?.action as string) ?? UNKNOWN_ACTION;
        const streamHandler = streamCollector[id] ?? new StreamHandler(id, toasts, finishAction);

        streamHandler.messageReceived(msg);
      }),
    );
}
