Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
150 changes: 149 additions & 1 deletion frontend/src/app/workspace/component/menu/menu.component.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,11 @@ import { ComponentFixture, TestBed } from "@angular/core/testing";
import { HttpClientTestingModule } from "@angular/common/http/testing";
import { RouterTestingModule } from "@angular/router/testing";
import { NzModalService, NzModalModule, NzModalRef } from "ng-zorro-antd/modal";
import { BehaviorSubject, of, throwError } from "rxjs";
import { BehaviorSubject, of, Subject, throwError } from "rxjs";

import { MenuComponent } from "./menu.component";
import { WorkflowWebsocketService } from "../../service/workflow-websocket/workflow-websocket.service";
import type { ExecutionDurationUpdateEvent } from "../../types/workflow-websocket.interface";
import { OperatorMetadataService } from "../../service/operator-metadata/operator-metadata.service";
import { StubOperatorMetadataService } from "../../service/operator-metadata/stub-operator-metadata.service";
import { ComputingUnitStatusService } from "../../../common/service/computing-unit/computing-unit-status/computing-unit-status.service";
Expand Down Expand Up @@ -636,4 +638,150 @@ describe("MenuComponent", () => {
});
});
});

// Regression coverage for #5323: the elapsed-time timer was refactored from a
// manually managed `durationUpdateSubscription` into a declarative `switchMap`
// pipe terminated by `untilDestroyed`. These tests pin the resulting behavior
// (base-duration updates, 1s cadence, restart-on-event, stop-when-idle) and,
// crucially, that the timer is torn down with the component so it cannot keep
// firing or leak after destroy.
describe("execution duration timer", () => {
let durationEvents$: Subject<{ type: "ExecutionDurationUpdateEvent" } & ExecutionDurationUpdateEvent>;
let timerFixture: ComponentFixture<MenuComponent>;
let timerComponent: MenuComponent;

function emitDuration(duration: number, isRunning: boolean): void {
durationEvents$.next({ type: "ExecutionDurationUpdateEvent", duration, isRunning });
}

beforeEach(() => {
vi.useFakeTimers();
durationEvents$ = new Subject();
const websocket = TestBed.inject(WorkflowWebsocketService);
const original = websocket.subscribeToEvent.bind(websocket);
// Only intercept the duration event; defer every other event type to the
// real implementation so unrelated subscriptions keep working.
vi.spyOn(websocket, "subscribeToEvent").mockImplementation((type: any) =>
type === "ExecutionDurationUpdateEvent" ? (durationEvents$.asObservable() as any) : original(type)
);

timerFixture = TestBed.createComponent(MenuComponent);
timerComponent = timerFixture.componentInstance;
timerFixture.detectChanges();
});

afterEach(() => {
vi.useRealTimers();
});

it("sets executionDuration to the event's base duration on each event", () => {
emitDuration(5000, false);
expect(timerComponent.executionDuration).toBe(5000);

emitDuration(8000, false);
expect(timerComponent.executionDuration).toBe(8000);
});

it("advances the duration by 1s every second while running", () => {
emitDuration(0, true);
expect(timerComponent.executionDuration).toBe(0);

vi.advanceTimersByTime(1000);
expect(timerComponent.executionDuration).toBe(1000);

vi.advanceTimersByTime(2000);
expect(timerComponent.executionDuration).toBe(3000);
});

it("does not start a timer when the execution is not running", () => {
emitDuration(7000, false);

vi.advanceTimersByTime(5000);

expect(timerComponent.executionDuration).toBe(7000);
});

it("restarts the 1s timer on each new running event, cancelling the previous one", () => {
emitDuration(0, true);
vi.advanceTimersByTime(1000);
expect(timerComponent.executionDuration).toBe(1000);

// A new event resets the base duration and restarts the cadence; the
// previous timer must be cancelled (switchMap) so it cannot double-count.
emitDuration(10000, true);
expect(timerComponent.executionDuration).toBe(10000);

vi.advanceTimersByTime(500);
expect(timerComponent.executionDuration).toBe(10000);

vi.advanceTimersByTime(500);
expect(timerComponent.executionDuration).toBe(11000);
});

it("stops the timer when a running execution transitions to not running", () => {
emitDuration(0, true);
vi.advanceTimersByTime(1000);
expect(timerComponent.executionDuration).toBe(1000);

emitDuration(2000, false);
vi.advanceTimersByTime(5000);
expect(timerComponent.executionDuration).toBe(2000);
});

it("tears down the timer on destroy so the duration stops advancing", () => {
emitDuration(0, true);
vi.advanceTimersByTime(1000);
expect(timerComponent.executionDuration).toBe(1000);

timerFixture.destroy();

// The previously running timer must not keep firing after destroy...
vi.advanceTimersByTime(5000);
expect(timerComponent.executionDuration).toBe(1000);

// ...nor should late events revive it (the source subscription is closed).
emitDuration(9999, true);
vi.advanceTimersByTime(5000);
expect(timerComponent.executionDuration).toBe(1000);
});
});

// Regression coverage for #5323: the computing-unit status subscription lost
// its manual `computingUnitStatusSubscription` aggregator and its
// `ngOnDestroy` unsubscribe, relying on `untilDestroyed` instead. These tests
// pin both that status updates still propagate and that they stop on destroy.
describe("computing unit status subscription", () => {
let status$: Subject<ComputingUnitState>;
let cuFixture: ComponentFixture<MenuComponent>;
let cuComponent: MenuComponent;

beforeEach(() => {
status$ = new Subject<ComputingUnitState>();
const cuService = TestBed.inject(ComputingUnitStatusService);
vi.spyOn(cuService, "getStatus").mockReturnValue(status$.asObservable());

cuFixture = TestBed.createComponent(MenuComponent);
cuComponent = cuFixture.componentInstance;
cuFixture.detectChanges();
});

it("updates computingUnitStatus and re-applies the run button behavior on each status emission", () => {
const applySpy = vi.spyOn(cuComponent, "applyRunButtonBehavior");

status$.next(ComputingUnitState.Running);

expect(cuComponent.computingUnitStatus).toBe(ComputingUnitState.Running);
expect(applySpy).toHaveBeenCalledTimes(1);
});

it("stops updating computingUnitStatus once the component is destroyed", () => {
status$.next(ComputingUnitState.Running);
expect(cuComponent.computingUnitStatus).toBe(ComputingUnitState.Running);

cuFixture.destroy();

status$.next(ComputingUnitState.NoComputingUnit);
expect(cuComponent.computingUnitStatus).toBe(ComputingUnitState.Running);
});
});
});
42 changes: 17 additions & 25 deletions frontend/src/app/workspace/component/menu/menu.component.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import { WorkflowActionService } from "../../service/workflow-graph/model/workfl
import { ExecutionState } from "../../types/execute-workflow.interface";
import { WorkflowWebsocketService } from "../../service/workflow-websocket/workflow-websocket.service";
import { WorkflowResultExportService } from "../../service/workflow-result-export/workflow-result-export.service";
import { catchError, debounceTime, filter, mergeMap, tap } from "rxjs/operators";
import { catchError, debounceTime, filter, mergeMap, switchMap, tap } from "rxjs/operators";
import { UntilDestroy, untilDestroyed } from "@ngneat/until-destroy";
import { WorkflowUtilService } from "../../service/workflow-graph/util/workflow-util.service";
import { WorkflowVersionService } from "../../../dashboard/service/user/workflow-version/workflow-version.service";
Expand All @@ -43,7 +43,7 @@ import { saveAs } from "file-saver";
import { NotificationService } from "src/app/common/service/notification/notification.service";
import { OperatorMenuService } from "../../service/operator-menu/operator-menu.service";
import { CoeditorPresenceService } from "../../service/workflow-graph/model/coeditor-presence.service";
import { firstValueFrom, of, Subscription, timer } from "rxjs";
import { EMPTY, firstValueFrom, of, timer } from "rxjs";
import { isDefined } from "../../../common/util/predicate";
import { NzModalService } from "ng-zorro-antd/modal";
import { ResultExportationComponent } from "../result-exportation/result-exportation.component";
Expand Down Expand Up @@ -154,14 +154,12 @@ export class MenuComponent implements OnInit, OnDestroy {
public runDisable = false;

public executionDuration = 0;
private durationUpdateSubscription: Subscription = new Subscription();

// flag to display a particular version in the current canvas
public displayParticularWorkflowVersion: boolean = false;
public onClickRunHandler: () => void;

// Computing unit status variables
private computingUnitStatusSubscription: Subscription = new Subscription();
public selectedComputingUnit: DashboardWorkflowComputingUnit | null = null;
public computingUnitStatus: ComputingUnitState = ComputingUnitState.NoComputingUnit;

Expand Down Expand Up @@ -193,17 +191,14 @@ export class MenuComponent implements OnInit, OnDestroy {
) {
workflowWebsocketService
.subscribeToEvent("ExecutionDurationUpdateEvent")
.pipe(untilDestroyed(this))
.subscribe(event => {
this.executionDuration = event.duration;
this.durationUpdateSubscription.unsubscribe();
if (event.isRunning) {
this.durationUpdateSubscription = timer(1000, 1000)
.pipe(untilDestroyed(this))
.subscribe(() => {
this.executionDuration += 1000;
});
}
.pipe(
tap(event => (this.executionDuration = event.duration)),
// restart the 1s timer on each event, only while running
switchMap(event => (event.isRunning ? timer(1000, 1000) : EMPTY)),
untilDestroyed(this)
)
.subscribe(() => {
this.executionDuration += 1000;
});
this.executionState = executeWorkflowService.getExecutionState().state;
// return the run button after the execution is finished, either
Expand Down Expand Up @@ -254,7 +249,6 @@ export class MenuComponent implements OnInit, OnDestroy {

ngOnDestroy(): void {
this.workflowResultExportService.resetFlags();
this.computingUnitStatusSubscription.unsubscribe();
}

private subscribeToComputingUnitSelection(): void {
Expand All @@ -271,15 +265,13 @@ export class MenuComponent implements OnInit, OnDestroy {
*/
private subscribeToComputingUnitStatus(): void {
// Subscribe to get the computing unit status
this.computingUnitStatusSubscription.add(
this.computingUnitStatusService
.getStatus()
.pipe(untilDestroyed(this))
.subscribe(status => {
this.computingUnitStatus = status;
this.applyRunButtonBehavior(this.getRunButtonBehavior());
})
);
this.computingUnitStatusService
.getStatus()
.pipe(untilDestroyed(this))
.subscribe(status => {
this.computingUnitStatus = status;
this.applyRunButtonBehavior(this.getRunButtonBehavior());
});
}

/**
Expand Down
Loading