import { Injectable, OnDestroy } from '@angular/core';
import { Observable, Subject } from 'rxjs';
import { filter, takeUntil } from 'rxjs/operators';
import { Listener, Method } from '../core/communication';
import { MonitoringBaseService } from '../core/monitoring-base.service';

@Injectable()
export class QueueItemsMonitoringService extends MonitoringBaseService implements OnDestroy {
  private _onDestroy = new Subject<void>();

  private _onLockQueueItem = new Subject<any>();
  private _onUnlockQueueItem = new Subject<any>();
  private _onQueueItemCompleted = new Subject<any>();

  constructor() {
    super('queue-items-monitoring');

    super.initListeners({
      [Listener.LockQueueItem]: (event) => this._onLockQueueItem.next(event),
      [Listener.UnlockQueueItem]: (event) => this._onUnlockQueueItem.next(event),
      [Listener.CompletedQueueItem]: (itemId, queueId) => this._onQueueItemCompleted.next({itemId, queueId}),
    });
  }
  
  public onLockQueueItem(queueId): Observable<any> { 
    return this._onLockQueueItem
    .pipe(takeUntil(this._onDestroy))
    .pipe(filter(e => !queueId || e.queueId === queueId));
  }
  
  public onUnlockQueueItem(queueId): Observable<any> { 
    return this._onUnlockQueueItem
    .pipe(takeUntil(this._onDestroy))
    .pipe(filter(e => !queueId || e.queueId === queueId));
  }
  
  public onQueueItemCompleted(queueId): Observable<any> { 
    return this._onQueueItemCompleted
    .pipe(takeUntil(this._onDestroy))
    .pipe(filter(e => !queueId || e.queueId === queueId));
  }

  public lockItem(itemId, queueId) {
    super.invoke(Method.LockItem, {
      itemId,
      queueId,
      isLocked: true,
    });
  }

  public unlockItem() {
    super.invoke(Method.UnlockItem);
  }

  public completeQueueItem(itemId, queueId) {
    super.invoke(Method.NotifyCompleted, itemId, queueId);
  }

  ngOnDestroy(): void {
    this._onDestroy.next();
    this._onDestroy.complete();

    super.onDestroy();
  }
}
