import { Observable, of, Subject } from 'rxjs';
import { delay, tap } from 'rxjs/operators';
import { cloneDeep, identity, pickBy } from 'lodash';
import {
  BackgroundTask,
  BackgroundTaskJob,
  BackgroundTaskMessage,
  BackgroundTaskProgress,
  BackgroundTaskStatus,
  WorkerTaskConfig,
} from '../models/background-task.model';
import { JobManager } from './job-manager';
import { AvailableWorkers } from '../workers';

export class JobQueue {
  constructor(private jobs: BackgroundTaskJob[]) {}

  addJobs(...jobs: BackgroundTaskJob[]) {
    this.jobs.push(...jobs);
  }

  getJob(): BackgroundTaskJob | null {
    return this.jobs.shift();
  }

  isEmpty(): boolean {
    return this.jobs.length === 0;
  }
}

export class WorkerTask {
  private readonly _task: BackgroundTask;
  private readonly _progress: BackgroundTaskProgress;
  private readonly _change: Subject<BackgroundTaskMessage> =
    new Subject<BackgroundTaskMessage>();
  private readonly jobManager: JobManager;
  private readonly queue: JobQueue;

  private pendingJobs: BackgroundTaskJob[] = [];
  private config: WorkerTaskConfig = {
    concurrentJobs: 1,
    delay: 0,
    break: true,
  };

  private readonly worker: (
    job: BackgroundTaskJob,
  ) => Observable<BackgroundTaskJob>;

  get task(): BackgroundTask {
    return this._task;
  }

  get progress(): BackgroundTaskProgress {
    return this._progress;
  }

  constructor(
    task: BackgroundTask,
    config: WorkerTaskConfig = {
      break: true,
    },
  ) {
    this._task = cloneDeep(task);
    this._progress = {
      totalJobs: this.task.jobs.length,
      doneJobs: 0,
      progressPercent: this.task.jobs.length > 0 ? 0 : 100,
    };
    const { name } = this.task;
    Object.assign(this.config, config);

    const _w = AvailableWorkers.find((a) => a['name'] === name);
    const worker = task.hasOwnProperty('worker')
      ? task['worker']
      : _w && _w.hasOwnProperty('worker')
        ? _w['worker']
        : (job) => of(job).pipe(delay(100));
    this.jobManager = new JobManager(worker, this.config);
    this.queue = new JobQueue(this.task.jobs);
    this.changeTaskStatus('PAUSED');
  }

  get onmessage(): Subject<BackgroundTaskMessage> {
    return this._change;
  }

  start() {
    this.resume();
  }

  resume() {
    this.changeTaskStatus('RUNNING');
    this.runTask();
  }

  pause() {
    this.cancelPendingJobs();
    this.changeTaskStatus('PAUSED');
  }

  status() {
    this.publishMessage();
  }

  stop() {
    this.changeTaskStatus('STOPPED');
    this.terminateTask();
  }

  private cancelPendingJobs() {
    this.pendingJobs.forEach((job) => this.jobManager.cancelJob(job.id));
  }

  private terminateTask() {
    this.cancelPendingJobs();
    this.changeTaskStatus('TERMINATED');
  }

  terminate() {
    this._change.unsubscribe();
  }

  private runTask() {
    if (!this.task || this.task.status !== 'RUNNING') {
      // exit loop if task is not running
      return;
    }
    if (this.pendingJobs.length < this.config.concurrentJobs) {
      if (this.queue.isEmpty() && this.pendingJobs.length === 0) {
        // task is over
        this.terminateTask();
        return;
      }
      const job: BackgroundTaskJob = this.queue.getJob();
      if (job) {
        this.pendingJobs.push(job);
        this.jobManager
          .performJob(job, (...args) => this.jobOver(...args))
          .subscribe({
            next: (e) => {},
            error: (err) => this._change.error(err),
          });

        this.runTask();
      }
    }
  }

  /**
   * callback when a job is over
   */
  private jobOver(job: BackgroundTaskJob, { payload, success }) {
    // remove pending job
    const jobIndex = this.pendingJobs.indexOf(job);
    if (jobIndex !== -1) {
      this.pendingJobs.splice(jobIndex, 1);
    }

    if (!success) {
      this.queue.addJobs(job);
      of([])
        .pipe(
          delay(100),
          tap(() => {
            this.runTask();
          }),
        )
        .subscribe();
      return;
    }

    this._task.payload = payload;
    this._progress.doneJobs++;
    this._progress.progressPercent = Math.ceil(
      (100 * this._progress.doneJobs) / this._progress.totalJobs,
    );

    this.publishMessage();

    this.runTask();
  }

  private changeTaskStatus(status: BackgroundTaskStatus) {
    if (this.task && this.task.status !== status) {
      this._task.status = status;
      this.publishMessage();
    }
  }

  private publishMessage() {
    const { id, payload, status } = this.task;
    const message: any = pickBy(
      {
        taskId: id,
        payload,
        taskStatus: status,
        progress: this.progress,
      },
      identity,
    );

    // console.log(JSON.stringify(message, null, 4));
    this._change.next(message);
  }
}
