Add a ResourceBudget mechanism which keeps disk usage in check during syncs#7649
Add a ResourceBudget mechanism which keeps disk usage in check during syncs#7649
Conversation
ffc20a0 to
3b86771
Compare
|
@balasankarc - Relevant to your previous pull requests |
|
|
||
| Defaults to 5120 (5gb) | ||
|
|
||
| ### SYNC\_MAX\_IN\_FLIGHT\_ITEMS |
There was a problem hiding this comment.
Actually the unit of measurement here is content, not artifacts, so this description is a little misleading - but it's a little difficult to describe.
| DOMAIN_ENABLED = False | ||
|
|
||
| MAX_CONCURRENT_CONTENT = 25 | ||
| MAX_CONCURRENT_CONTENT = 200 |
There was a problem hiding this comment.
Reset this back to where it was before we reduced it to limit worst-case scenarios
| """ | ||
| async for batch in self.batches(minsize=settings.MAX_CONCURRENT_CONTENT): | ||
| flush_event = self.resource_budget.pressure if self.resource_budget else None | ||
| minsize = self.resource_budget.max_items if self.resource_budget else 200 |
There was a problem hiding this comment.
This is a little weird because, on one hand actually the resource budget ought to manage this completely, and on the other hand it's hard to actually see the downside of setting a reasonably sized (fairly large) batch size by default? It's going to exist anyway (defaults to 500 if you don't set it manually).
| await self.put(d_content) | ||
|
|
||
|
|
||
| class GenericDownloader(Stage): |
There was a problem hiding this comment.
There's one open question, which is whether max_concurrent_content handling in GenericDownloader ought to be removed entirely.
ResourceBudget accomplishes the same goal but slightly differently. max_concurrent_content blocks items from being pulled from the queue, before the asyncio.Task is created, while ResourceBudget blocks on acquire() after the task has been created but before it does work.
So I suppose ResourceBudget might accumulate asyncio.Task objects, which might waste a little but of memory but probably doesn't matter in practice? I'm not that familiar with these aspects of the pipeline or asyncio, though.
Another option is to just set a fairly permissive cap as a safety net and otherwise let ResourceBudget handle it.
There was a problem hiding this comment.
And this is not even the same as max_concurrent_download (handled inside aiohttp), right?
There was a problem hiding this comment.
I assume you mean either download_concurrency or rate_limit, but yes.
Lots of these primitives are very closely related but not 100% identical and we could probably do with simplifying them.
I think, IIRC, rate_limit throttles the number of total simultaneous downloads and download_concurrency is the number of simultaneous connections allowed to any single host.
MAX_CONCURRENT_CONTENT / max_concurrent_content is "don't buffer too many items inside the download stage at once"
Also the former operates on individual downloads (artifacts) while the latter is content which may be multi-artifact.
The net impact of these is similar but not 100% identical
| if flush_event_listener and flush_event_listener in done: | ||
| # Don't re-arm until after we yield a batch, to avoid a spin loop | ||
| # when the event stays set but the batch is empty. | ||
| flush_event_listener = None | ||
| no_block = True |
There was a problem hiding this comment.
What is the fundamental difference here between the two events?
If I read it correctly, the thaw_event is cleared right before yielding.
Can these events be consolidated into one?
There was a problem hiding this comment.
I'm not sure they can, I think they have different lifetimes and the way they get re-armed is different.
| no_block = False | ||
| # Re-arm the flush listener after yielding | ||
| if flush_event and flush_event_listener is None: | ||
| flush_event_listener = asyncio.ensure_future(flush_event.wait()) |
There was a problem hiding this comment.
Don't you need to .clear() the event to "rearm" it?
There was a problem hiding this comment.
It gets cleared in release(), self.pressure and flush_event are the same
There was a problem hiding this comment.
Would renaming "pressure" to "flush_event" help making that clearer?
mdellweg
left a comment
There was a problem hiding this comment.
I think i can see now how this works.
There are many nobs to tweak, and we can probably not expect to find a one size-fits-all configuration.
One thing I keep wondering: Are there situations, where d_content is dropped between download and saving? Would we "leak" acquired resources that way?
| no_block = False | ||
| # Re-arm the flush listener after yielding | ||
| if flush_event and flush_event_listener is None: | ||
| flush_event_listener = asyncio.ensure_future(flush_event.wait()) |
There was a problem hiding this comment.
Would renaming "pressure" to "flush_event" help making that clearer?
Artifacts stay on disk between the ArtifactDownloader stage and the ArtifactSaver stage. If too many large files build up, it can exceed the allotted filesystem space of the working directory. Previously we used unecessarily small batch sizes by default in order to ensure the worst case was avoided. This approach dynamically controls how much disk space is being used by the task and provides backpressure when the limit is exceeded, flushing batches and preventing new artifacts from being downloaded. closes pulp#7559 Assisted-By: claude-opus-4.6
📜 Checklist
See: Pull Request Walkthrough