import {Injectable} from '@angular/core';
import {Observable, ReplaySubject} from 'rxjs';
import {EJobStatus, JobModel, MonitoredJobModel} from '../../models/api/job.model';
import {filter, takeWhile} from 'rxjs/operators';
import {SocketService} from './socket/socket.service';

@Injectable({
    providedIn: 'root'
})
export class MonitoredJobsService {
    constructor(private socketService: SocketService) {}

    private monitoredJobs: MonitoredJobModel[] = [];
    private monitoredJobsSubject: ReplaySubject<JobModel[]> = new ReplaySubject<JobModel[]>(1);

    /**
     * Returns a list of all currently active jobs
     * @returns {Observable<JobModel[]>}
     */
    public getAllMonitoredJobs(): Observable<JobModel[]> {
        return this.monitoredJobsSubject;
    }

    /**
     * Clears a job from the monitoredJobs array
     * @param {string} jobId
     */
    public removeMonitoredJob(jobId: string): void {
        this.monitoredJobs = this.monitoredJobs.filter((monitoredJob) => monitoredJob._id !== jobId);
        this.announceMonitorJobs();
    }

    /**
     * Creates a new Observable to monitor a job
     * @param {string} jobId
     * @returns {Observable<JobModel>}
     */
    public getJobMonitor(jobId: string): Observable<JobModel> {
        return this.socketService.jobMessages$
            .pipe(
                filter(job => job._id === jobId),
                takeWhile(job => ![EJobStatus.DONE, EJobStatus.FAILED].includes(job.status), true)
            );
    }


    /**
     * announce monitored jobs
     */
    private announceMonitorJobs(): void {
        this.monitoredJobsSubject.next(this.monitoredJobs.map((monitoredJob) => {
            return new JobModel(monitoredJob._id, monitoredJob.type);
        }));
    }

    /**
     * add job to monitor
     * @param {JobModel} job
     */
    public addJobToMonitor(job: JobModel): void {
        this.monitoredJobs.unshift(job as MonitoredJobModel);
        this.announceMonitorJobs();
    }


}
