diff --git a/controller/getchangedtargets.go b/controller/getchangedtargets.go index baf075d..dda3732 100644 --- a/controller/getchangedtargets.go +++ b/controller/getchangedtargets.go @@ -38,7 +38,9 @@ type job struct { cancel context.CancelFunc } -// GetChangedTargets returns the changed targets between two revisions. +// GetChangedTargets returns the changed targets between two revisions. If the +// client disconnects, the stream's context is cancelled and the function +// returns with context.Canceled. func (c *controller) GetChangedTargets(request *pb.GetChangedTargetsRequest, stream pb.TangoServiceGetChangedTargetsYARPCServer) (retErr error) { scope := c.scope.SubScope("get_changed_targets") scope.Counter("calls").Inc(1) @@ -239,7 +241,7 @@ func (c *controller) GetChangedTargets(request *pb.GetChangedTargetsRequest, str jobs[1].graphStreamChunks = nil compareStart := time.Now() - changedTargetsResponses, err := c.compareTargetGraphs(logger, firstGraph, secondGraph, maxDist, request.GetOutputConfig().GetComputeDistances()) + changedTargetsResponses, err := c.compareTargetGraphs(ctx, logger, firstGraph, secondGraph, maxDist, request.GetOutputConfig().GetComputeDistances()) // Allow GC of raw graph data while the caching goroutine runs. firstGraph = nil secondGraph = nil @@ -294,7 +296,7 @@ func (c *controller) GetChangedTargets(request *pb.GetChangedTargetsRequest, str // requested or when filtering by distance is active. Output IDs are // re-mapped into a canonical per-call namespace so the response metadata // only carries the names actually referenced. -func (c *controller) compareTargetGraphs(logger *zap.Logger, firstGraph, secondGraph []*pb.GetTargetGraphResponse, maxDist int32, outputDistances bool) ([]*pb.GetChangedTargetsResponse, error) { +func (c *controller) compareTargetGraphs(ctx context.Context, logger *zap.Logger, firstGraph, secondGraph []*pb.GetTargetGraphResponse, maxDist int32, outputDistances bool) ([]*pb.GetChangedTargetsResponse, error) { start := time.Now() scope := c.scope.SubScope("compare_target_graphs") logger.Info("compareTargetGraphs: Computing differences between target graphs") @@ -302,19 +304,42 @@ func (c *controller) compareTargetGraphs(logger *zap.Logger, firstGraph, secondG // 1) Extract targets and metadata; index by canonical names indexStart := time.Now() firstTargetsByID, firstMetadata := getTargetsAndMetadata(firstGraph) + + if ctx.Err() != nil { + return nil, ctx.Err() + } + secondTargetsByID, secondMetadata := getTargetsAndMetadata(secondGraph) + + if ctx.Err() != nil { + return nil, ctx.Err() + } + // Release raw chunk slices — individual target protos are now held by the ID maps. firstGraph = nil secondGraph = nil firstByName := buildNameIndex(firstTargetsByID, firstMetadata) firstTargetsByID = nil // all pointers are now in firstByName; drop the duplicate map + + if ctx.Err() != nil { + return nil, ctx.Err() + } + secondByName := buildNameIndex(secondTargetsByID, secondMetadata) secondTargetsByID = nil indexDuration := time.Since(indexStart) scope.Timer("index_duration").Record(indexDuration) + if ctx.Err() != nil { + return nil, ctx.Err() + } + sourceFileRuleTypeID := detectSourceFileID(secondMetadata) + if ctx.Err() != nil { + return nil, ctx.Err() + } + changedByName := make(map[string]*pb.ChangedTarget) changedSourceFileTargets := make(map[string]struct{}) @@ -399,6 +424,10 @@ func (c *controller) compareTargetGraphs(logger *zap.Logger, firstGraph, secondG diffScanDuration := time.Since(diffScanStart) scope.Timer("diff_scan_duration").Record(diffScanDuration) + if ctx.Err() != nil { + return nil, ctx.Err() + } + // Iterate over the changed targets and check if any of them are DIRECT changes. classifyStart := time.Now() for name, ct := range changedByName { @@ -436,6 +465,10 @@ func (c *controller) compareTargetGraphs(logger *zap.Logger, firstGraph, secondG classifyDuration := time.Since(classifyStart) scope.Timer("classify_duration").Record(classifyDuration) + if ctx.Err() != nil { + return nil, ctx.Err() + } + // Compute BFS distances when filtering is active or the client requested distance output. if maxDist >= 0 || outputDistances { distancesStart := time.Now() @@ -444,6 +477,10 @@ func (c *controller) compareTargetGraphs(logger *zap.Logger, firstGraph, secondG scope.Timer("distances_duration").Record(distancesDuration) } + if ctx.Err() != nil { + return nil, ctx.Err() + } + // Collect changed targets. changed := make([]*pb.ChangedTarget, 0, len(changedByName)) for _, ct := range changedByName { diff --git a/controller/getchangedtargets_test.go b/controller/getchangedtargets_test.go index 8342bf3..e049310 100644 --- a/controller/getchangedtargets_test.go +++ b/controller/getchangedtargets_test.go @@ -137,7 +137,7 @@ func TestCompareTargetGraphs(t *testing.T) { }, } - response, err := c.compareTargetGraphs(zap.NewNop(), []*pb.GetTargetGraphResponse{firstGraph}, []*pb.GetTargetGraphResponse{secondGraph}, -1, false) + response, err := c.compareTargetGraphs(context.Background(), zap.NewNop(), []*pb.GetTargetGraphResponse{firstGraph}, []*pb.GetTargetGraphResponse{secondGraph}, -1, false) require.NoError(t, err) require.NotNil(t, response) } @@ -436,7 +436,7 @@ func TestCompareTargetGraphs_NewTarget_CanonicalIDs(t *testing.T) { }, }, } - res, err := c.compareTargetGraphs(zap.NewNop(), first, second, -1, false) + res, err := c.compareTargetGraphs(context.Background(), zap.NewNop(), first, second, -1, false) require.NoError(t, err) require.Len(t, res, 2) cs := res[0].GetChangedTargets() @@ -508,7 +508,7 @@ func TestCompareTargetGraphs_SourceFileDirectAndPropagation(t *testing.T) { }, }, } - res, err := c.compareTargetGraphs(zap.NewNop(), first, second, -1, false) + res, err := c.compareTargetGraphs(context.Background(), zap.NewNop(), first, second, -1, false) require.NoError(t, err) cs := res[0].GetChangedTargets() require.NotNil(t, cs) @@ -576,7 +576,7 @@ func TestCompareTargetGraphs_IndirectWhenNoSourceDep(t *testing.T) { }, }, } - res, err := c.compareTargetGraphs(zap.NewNop(), first, second, -1, false) + res, err := c.compareTargetGraphs(context.Background(), zap.NewNop(), first, second, -1, false) require.NoError(t, err) cs := res[0].GetChangedTargets() require.NotNil(t, cs) @@ -641,7 +641,7 @@ func TestCompareTargetGraphs_DirectWhenDependenciesChanged(t *testing.T) { }, }, } - res, err := c.compareTargetGraphs(zap.NewNop(), first, second, -1, false) + res, err := c.compareTargetGraphs(context.Background(), zap.NewNop(), first, second, -1, false) require.NoError(t, err) cs := res[0].GetChangedTargets() require.NotNil(t, cs) @@ -722,7 +722,7 @@ func TestCompareTargetGraphs_DirectWhenAttributesChanged(t *testing.T) { }, }, } - res, err := c.compareTargetGraphs(zap.NewNop(), first, second, -1, false) + res, err := c.compareTargetGraphs(context.Background(), zap.NewNop(), first, second, -1, false) require.NoError(t, err) cs := res[0].GetChangedTargets() require.NotNil(t, cs) @@ -802,7 +802,7 @@ func TestCompareTargetGraphs_DirectWhenNewAttributeAdded(t *testing.T) { }, }, } - res, err := c.compareTargetGraphs(zap.NewNop(), first, second, -1, false) + res, err := c.compareTargetGraphs(context.Background(), zap.NewNop(), first, second, -1, false) require.NoError(t, err) cs := res[0].GetChangedTargets() require.NotNil(t, cs) @@ -1101,7 +1101,7 @@ func TestCompareTargetGraphs_IndirectWhenOnlyHashChanged(t *testing.T) { }, }, } - res, err := c.compareTargetGraphs(zap.NewNop(), first, second, -1, false) + res, err := c.compareTargetGraphs(context.Background(), zap.NewNop(), first, second, -1, false) require.NoError(t, err) cs := res[0].GetChangedTargets() require.NotNil(t, cs)