import {
  BackgroundTaskJob,
  WorkerTaskConfig,
} from '../models/background-task.model';
import { Observable, Subject } from 'rxjs';
import { delay, take } from 'rxjs/operators';

interface JobWrapper {
  job: any;
  canceled?: boolean;
}

export class JobManager {
  private pendingJobs: Map<string, JobWrapper> = new Map<string, JobWrapper>();

  constructor(
    private worker: (job: BackgroundTaskJob) => Observable<BackgroundTaskJob>,
    private readonly config: WorkerTaskConfig = {
      delay: 0, // defaults
      break: true, //defaults
    },
  ) {
    this.config = Object.assign({ break: true }, this.config);
  }

  cancelJob(jobId: string) {
    this.pendingJobs.get(jobId).canceled = true;
  }

  performJob(job: BackgroundTaskJob, onJobOver: (job: any, arg) => void) {
    const _break = this.config['break'];
    const _delay = this.config['delay'];
    this.pendingJobs.set(job.id, { job });
    const subject = new Subject();
    const next = (payload) => {
      onJobOver(job, {
        payload,
        success: !this.pendingJobs.get(job.id).canceled,
      });
    };

    this.worker(job)
      .pipe(delay(_delay), take(1))
      .subscribe({
        next,
        error: (err) => {
          if (!_break) {
            next(err);
          } else subject.error(err);
        },
        complete: () => subject.complete(),
      });

    return subject;
  }
}
