Skip to content

Add missing cases when the pipeline should be stopped #3564

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

leszko
Copy link
Contributor

@leszko leszko commented May 13, 2025

No description provided.

@github-actions github-actions bot added go Pull requests that update Go code AI Issues and PR related to the AI-video branch. labels May 13, 2025
@leszko leszko requested review from victorges, j0sh and mjh1 May 13, 2025 12:48
@@ -63,6 +63,7 @@ func startTricklePublish(ctx context.Context, url *url.URL, params aiRequestPara
if err := publisher.Close(); err != nil {
clog.Infof(ctx, "Error closing trickle publisher. err=%v", err)
}
params.liveParams.stopPipeline(fmt.Errorf("publisher is closed"))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Technically it is the segment reader that is closing; the trickle publisher is being closed as a side effect. I also wonder how informative this will actually be; typically the segment reader is being closed for another reason that is already reported upstream, eg WHIP disconnect.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, but shouldn't we just stop the pipeline in any failure? How would the pipeline function if the publisher is closed?

I'd try to avoid this leaking, because in this case, the publisher is stopped, but we still have the whole pipeline running.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are closing the publisher here as a result of the segment reader closing, which happens because the ingest connection has terminated. The publisher is not closing spontaneously on its own. That isn't a failure, it's just a normal teardown. That only happens in one place per ingest method (mediamtx) (whip).

To be clear, this should be harmless, I just don't know how much value it will add vis-a-vis being noise in the metabase event stream.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even if no-op right now, I think it's better to make a clear code flow. If we stop publishing or subscribing any data, just stop everything.

Otherwise, it's confusing, I've started to detach the ingest closing from the trickle publish (for O Swapping) and suddenly realized, "hey the old publisher is still working, why?". So, I'd like to avoid such pitfalls. We may not print an error message, we may not send an error event, but I think we should have a clear condition for stopping everything if any of the trickle parts is not working.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should have a clear condition for stopping everything if any of the trickle parts is not working.

Sure, but as far as I can tell, the only reason trickle is "not working" at this point is because the input has already stopped. Do we need to stop again? Do you see a case where it might not be stopped?

Generally I would prefer not to sprinkle cleanup functions around without fully understanding the code paths that could lead to those. For example, it is very helpful to have the first error be closest to the root cause. If a teardown "error" happens to be sent out first, then that masks an important piece of information. So we should be careful when adding stopPipeline calls (and preferably propagate errors as far back upstream as possible, because calls like stopPipeline kick off an inherently async teardown process which can make the overall flow hard to trace out.)

Anyway, that being said, if we can be sure that "publisher is closed" won't be the first error sent out under normal conditions then this is probably fine. Also, do you see cases where we'd see this as the first error?

@@ -368,6 +370,7 @@ func startControlPublish(ctx context.Context, control *url.URL, params aiRequest
}
// if there was another type of error, we'll just retry anyway
case <-done:
params.liveParams.stopPipeline(fmt.Errorf("control publish stopped %w", err))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Normally the input is already disconnected by this point, too, since StopControl (which closes the done channel) is only called once per ingest method, after the input has disconnected (mediamtx)(whip).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The same what I wrote above, maybe it's already handled, but in the different part of the code, I think we should make it clear that if any Trickle part is closed, we stop the pipeline.

@leszko leszko requested a review from j0sh May 14, 2025 06:35
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
AI Issues and PR related to the AI-video branch. go Pull requests that update Go code
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants