From 28b023b10a2b67ebdea64bfafd8b9ddfefa8375d Mon Sep 17 00:00:00 2001 From: Matthew Ball Date: Tue, 2 Jun 2026 19:56:55 -0700 Subject: [PATCH 1/2] fixed old unsubscribe scripts left over --- .../component/menu/menu.component.ts | 42 ++++++++----------- 1 file changed, 17 insertions(+), 25 deletions(-) diff --git a/frontend/src/app/workspace/component/menu/menu.component.ts b/frontend/src/app/workspace/component/menu/menu.component.ts index 69f08e4fefb..aa9b3a07277 100644 --- a/frontend/src/app/workspace/component/menu/menu.component.ts +++ b/frontend/src/app/workspace/component/menu/menu.component.ts @@ -33,7 +33,7 @@ import { WorkflowActionService } from "../../service/workflow-graph/model/workfl import { ExecutionState } from "../../types/execute-workflow.interface"; import { WorkflowWebsocketService } from "../../service/workflow-websocket/workflow-websocket.service"; import { WorkflowResultExportService } from "../../service/workflow-result-export/workflow-result-export.service"; -import { catchError, debounceTime, filter, mergeMap, tap } from "rxjs/operators"; +import { catchError, debounceTime, filter, mergeMap, switchMap, tap } from "rxjs/operators"; import { UntilDestroy, untilDestroyed } from "@ngneat/until-destroy"; import { WorkflowUtilService } from "../../service/workflow-graph/util/workflow-util.service"; import { WorkflowVersionService } from "../../../dashboard/service/user/workflow-version/workflow-version.service"; @@ -43,7 +43,7 @@ import { saveAs } from "file-saver"; import { NotificationService } from "src/app/common/service/notification/notification.service"; import { OperatorMenuService } from "../../service/operator-menu/operator-menu.service"; import { CoeditorPresenceService } from "../../service/workflow-graph/model/coeditor-presence.service"; -import { firstValueFrom, of, Subscription, timer } from "rxjs"; +import { EMPTY, firstValueFrom, of, timer } from "rxjs"; import { isDefined } from "../../../common/util/predicate"; import { NzModalService } from "ng-zorro-antd/modal"; import { ResultExportationComponent } from "../result-exportation/result-exportation.component"; @@ -154,14 +154,12 @@ export class MenuComponent implements OnInit, OnDestroy { public runDisable = false; public executionDuration = 0; - private durationUpdateSubscription: Subscription = new Subscription(); // flag to display a particular version in the current canvas public displayParticularWorkflowVersion: boolean = false; public onClickRunHandler: () => void; // Computing unit status variables - private computingUnitStatusSubscription: Subscription = new Subscription(); public selectedComputingUnit: DashboardWorkflowComputingUnit | null = null; public computingUnitStatus: ComputingUnitState = ComputingUnitState.NoComputingUnit; @@ -193,17 +191,14 @@ export class MenuComponent implements OnInit, OnDestroy { ) { workflowWebsocketService .subscribeToEvent("ExecutionDurationUpdateEvent") - .pipe(untilDestroyed(this)) - .subscribe(event => { - this.executionDuration = event.duration; - this.durationUpdateSubscription.unsubscribe(); - if (event.isRunning) { - this.durationUpdateSubscription = timer(1000, 1000) - .pipe(untilDestroyed(this)) - .subscribe(() => { - this.executionDuration += 1000; - }); - } + .pipe( + tap(event => (this.executionDuration = event.duration)), + // restart the 1s timer on each event, only while running + switchMap(event => (event.isRunning ? timer(1000, 1000) : EMPTY)), + untilDestroyed(this) + ) + .subscribe(() => { + this.executionDuration += 1000; }); this.executionState = executeWorkflowService.getExecutionState().state; // return the run button after the execution is finished, either @@ -254,7 +249,6 @@ export class MenuComponent implements OnInit, OnDestroy { ngOnDestroy(): void { this.workflowResultExportService.resetFlags(); - this.computingUnitStatusSubscription.unsubscribe(); } private subscribeToComputingUnitSelection(): void { @@ -271,15 +265,13 @@ export class MenuComponent implements OnInit, OnDestroy { */ private subscribeToComputingUnitStatus(): void { // Subscribe to get the computing unit status - this.computingUnitStatusSubscription.add( - this.computingUnitStatusService - .getStatus() - .pipe(untilDestroyed(this)) - .subscribe(status => { - this.computingUnitStatus = status; - this.applyRunButtonBehavior(this.getRunButtonBehavior()); - }) - ); + this.computingUnitStatusService + .getStatus() + .pipe(untilDestroyed(this)) + .subscribe(status => { + this.computingUnitStatus = status; + this.applyRunButtonBehavior(this.getRunButtonBehavior()); + }); } /** From b7bb8c96078a17c5e8b6c9c7c7aa28ef7df46695 Mon Sep 17 00:00:00 2001 From: Matthew Ball Date: Wed, 3 Jun 2026 00:00:26 -0700 Subject: [PATCH 2/2] added test --- .../component/menu/menu.component.spec.ts | 150 +++++++++++++++++- 1 file changed, 149 insertions(+), 1 deletion(-) diff --git a/frontend/src/app/workspace/component/menu/menu.component.spec.ts b/frontend/src/app/workspace/component/menu/menu.component.spec.ts index a1865a26282..3825da95a09 100644 --- a/frontend/src/app/workspace/component/menu/menu.component.spec.ts +++ b/frontend/src/app/workspace/component/menu/menu.component.spec.ts @@ -22,9 +22,11 @@ import { ComponentFixture, TestBed } from "@angular/core/testing"; import { HttpClientTestingModule } from "@angular/common/http/testing"; import { RouterTestingModule } from "@angular/router/testing"; import { NzModalService, NzModalModule, NzModalRef } from "ng-zorro-antd/modal"; -import { BehaviorSubject, of, throwError } from "rxjs"; +import { BehaviorSubject, of, Subject, throwError } from "rxjs"; import { MenuComponent } from "./menu.component"; +import { WorkflowWebsocketService } from "../../service/workflow-websocket/workflow-websocket.service"; +import type { ExecutionDurationUpdateEvent } from "../../types/workflow-websocket.interface"; import { OperatorMetadataService } from "../../service/operator-metadata/operator-metadata.service"; import { StubOperatorMetadataService } from "../../service/operator-metadata/stub-operator-metadata.service"; import { ComputingUnitStatusService } from "../../../common/service/computing-unit/computing-unit-status/computing-unit-status.service"; @@ -636,4 +638,150 @@ describe("MenuComponent", () => { }); }); }); + + // Regression coverage for #5323: the elapsed-time timer was refactored from a + // manually managed `durationUpdateSubscription` into a declarative `switchMap` + // pipe terminated by `untilDestroyed`. These tests pin the resulting behavior + // (base-duration updates, 1s cadence, restart-on-event, stop-when-idle) and, + // crucially, that the timer is torn down with the component so it cannot keep + // firing or leak after destroy. + describe("execution duration timer", () => { + let durationEvents$: Subject<{ type: "ExecutionDurationUpdateEvent" } & ExecutionDurationUpdateEvent>; + let timerFixture: ComponentFixture; + let timerComponent: MenuComponent; + + function emitDuration(duration: number, isRunning: boolean): void { + durationEvents$.next({ type: "ExecutionDurationUpdateEvent", duration, isRunning }); + } + + beforeEach(() => { + vi.useFakeTimers(); + durationEvents$ = new Subject(); + const websocket = TestBed.inject(WorkflowWebsocketService); + const original = websocket.subscribeToEvent.bind(websocket); + // Only intercept the duration event; defer every other event type to the + // real implementation so unrelated subscriptions keep working. + vi.spyOn(websocket, "subscribeToEvent").mockImplementation((type: any) => + type === "ExecutionDurationUpdateEvent" ? (durationEvents$.asObservable() as any) : original(type) + ); + + timerFixture = TestBed.createComponent(MenuComponent); + timerComponent = timerFixture.componentInstance; + timerFixture.detectChanges(); + }); + + afterEach(() => { + vi.useRealTimers(); + }); + + it("sets executionDuration to the event's base duration on each event", () => { + emitDuration(5000, false); + expect(timerComponent.executionDuration).toBe(5000); + + emitDuration(8000, false); + expect(timerComponent.executionDuration).toBe(8000); + }); + + it("advances the duration by 1s every second while running", () => { + emitDuration(0, true); + expect(timerComponent.executionDuration).toBe(0); + + vi.advanceTimersByTime(1000); + expect(timerComponent.executionDuration).toBe(1000); + + vi.advanceTimersByTime(2000); + expect(timerComponent.executionDuration).toBe(3000); + }); + + it("does not start a timer when the execution is not running", () => { + emitDuration(7000, false); + + vi.advanceTimersByTime(5000); + + expect(timerComponent.executionDuration).toBe(7000); + }); + + it("restarts the 1s timer on each new running event, cancelling the previous one", () => { + emitDuration(0, true); + vi.advanceTimersByTime(1000); + expect(timerComponent.executionDuration).toBe(1000); + + // A new event resets the base duration and restarts the cadence; the + // previous timer must be cancelled (switchMap) so it cannot double-count. + emitDuration(10000, true); + expect(timerComponent.executionDuration).toBe(10000); + + vi.advanceTimersByTime(500); + expect(timerComponent.executionDuration).toBe(10000); + + vi.advanceTimersByTime(500); + expect(timerComponent.executionDuration).toBe(11000); + }); + + it("stops the timer when a running execution transitions to not running", () => { + emitDuration(0, true); + vi.advanceTimersByTime(1000); + expect(timerComponent.executionDuration).toBe(1000); + + emitDuration(2000, false); + vi.advanceTimersByTime(5000); + expect(timerComponent.executionDuration).toBe(2000); + }); + + it("tears down the timer on destroy so the duration stops advancing", () => { + emitDuration(0, true); + vi.advanceTimersByTime(1000); + expect(timerComponent.executionDuration).toBe(1000); + + timerFixture.destroy(); + + // The previously running timer must not keep firing after destroy... + vi.advanceTimersByTime(5000); + expect(timerComponent.executionDuration).toBe(1000); + + // ...nor should late events revive it (the source subscription is closed). + emitDuration(9999, true); + vi.advanceTimersByTime(5000); + expect(timerComponent.executionDuration).toBe(1000); + }); + }); + + // Regression coverage for #5323: the computing-unit status subscription lost + // its manual `computingUnitStatusSubscription` aggregator and its + // `ngOnDestroy` unsubscribe, relying on `untilDestroyed` instead. These tests + // pin both that status updates still propagate and that they stop on destroy. + describe("computing unit status subscription", () => { + let status$: Subject; + let cuFixture: ComponentFixture; + let cuComponent: MenuComponent; + + beforeEach(() => { + status$ = new Subject(); + const cuService = TestBed.inject(ComputingUnitStatusService); + vi.spyOn(cuService, "getStatus").mockReturnValue(status$.asObservable()); + + cuFixture = TestBed.createComponent(MenuComponent); + cuComponent = cuFixture.componentInstance; + cuFixture.detectChanges(); + }); + + it("updates computingUnitStatus and re-applies the run button behavior on each status emission", () => { + const applySpy = vi.spyOn(cuComponent, "applyRunButtonBehavior"); + + status$.next(ComputingUnitState.Running); + + expect(cuComponent.computingUnitStatus).toBe(ComputingUnitState.Running); + expect(applySpy).toHaveBeenCalledTimes(1); + }); + + it("stops updating computingUnitStatus once the component is destroyed", () => { + status$.next(ComputingUnitState.Running); + expect(cuComponent.computingUnitStatus).toBe(ComputingUnitState.Running); + + cuFixture.destroy(); + + status$.next(ComputingUnitState.NoComputingUnit); + expect(cuComponent.computingUnitStatus).toBe(ComputingUnitState.Running); + }); + }); });