Revert "Revert "Refactor rate_limit.go for more clarify""
This reverts commit 526416b1e49931b13948d1ccff8d50f2c1b4d178.
Figured out a fix for the deadlocks; resubmitting the patch.
The first version was deadlocking because the switch statement in
zipWriter.write would choose a specific zip entry to start writing,
but the individual chunks may not have all necessarily been compressed
yet. When each individual chunk was made to require to request its own
allocations, the compression of the chunks of the file being currently
written could be blocked waiting for memory to be freed by chunks from
other files that hadn't yet started being written.
This patch is much like the original except it preallocates the memory
for the entire file upfront (and happens to use the total file size
rather than the compressed size, but I didn't observe that to cause any
performance differences).
Bug: 64536066
Test: m -j dist showcommands # which runs soong_zip to package everything
Change-Id: Id1d7ff415e54d3a6be71188abbdbbbab5a719fcf
diff --git a/cmd/soong_zip/rate_limit.go b/cmd/soong_zip/rate_limit.go
index 9e95bc1..9cb5fdd 100644
--- a/cmd/soong_zip/rate_limit.go
+++ b/cmd/soong_zip/rate_limit.go
@@ -15,71 +15,54 @@
package main
import (
+ "fmt"
"runtime"
)
type RateLimit struct {
- requests chan struct{}
- finished chan int
- released chan int
- stop chan struct{}
+ requests chan request
+ completions chan int64
+
+ stop chan struct{}
}
-// NewRateLimit starts a new rate limiter with maxExecs number of executions
-// allowed to happen at a time. If maxExecs is <= 0, it will default to the
-// number of logical CPUs on the system.
-//
-// With Finish and Release, we'll keep track of outstanding buffer sizes to be
-// written. If that size goes above maxMem, we'll prevent starting new
-// executions.
-//
-// The total memory use may be higher due to current executions. This just
-// prevents runaway memory use due to slower writes.
-func NewRateLimit(maxExecs int, maxMem int64) *RateLimit {
- if maxExecs <= 0 {
- maxExecs = runtime.NumCPU()
- }
- if maxMem <= 0 {
- // Default to 512MB
- maxMem = 512 * 1024 * 1024
- }
+type request struct {
+ size int64
+ serviced chan struct{}
+}
+// NewRateLimit starts a new rate limiter that permits the usage of up to <capacity> at once,
+// except when no capacity is in use, in which case the first caller is always permitted
+func NewRateLimit(capacity int64) *RateLimit {
ret := &RateLimit{
- requests: make(chan struct{}),
+ requests: make(chan request),
+ completions: make(chan int64),
- // Let all of the pending executions to mark themselves as finished,
- // even if our goroutine isn't processing input.
- finished: make(chan int, maxExecs),
-
- released: make(chan int),
- stop: make(chan struct{}),
+ stop: make(chan struct{}),
}
- go ret.goFunc(maxExecs, maxMem)
+ go ret.monitorChannels(capacity)
return ret
}
-// RequestExecution blocks until another execution can be allowed to run.
-func (r *RateLimit) RequestExecution() Execution {
- <-r.requests
- return r.finished
+// RequestExecution blocks until another execution of size <size> can be allowed to run.
+func (r *RateLimit) Request(size int64) {
+ request := request{
+ size: size,
+ serviced: make(chan struct{}, 1),
+ }
+
+ // wait for the request to be received
+ r.requests <- request
+
+ // wait for the request to be accepted
+ <-request.serviced
}
-type Execution chan<- int
-
-// Finish will mark your execution as finished, and allow another request to be
-// approved.
-//
-// bufferSize may be specified to count memory buffer sizes, and must be
-// matched with calls to RateLimit.Release to mark the buffers as released.
-func (e Execution) Finish(bufferSize int) {
- e <- bufferSize
-}
-
-// Call Release when finished with a buffer recorded with Finish.
-func (r *RateLimit) Release(bufferSize int) {
- r.released <- bufferSize
+// Finish declares the completion of an execution of size <size>
+func (r *RateLimit) Finish(size int64) {
+ r.completions <- size
}
// Stop the background goroutine
@@ -87,29 +70,83 @@
close(r.stop)
}
-func (r *RateLimit) goFunc(maxExecs int, maxMem int64) {
- var curExecs int
- var curMemory int64
+// monitorChannels processes incoming requests from channels
+func (r *RateLimit) monitorChannels(capacity int64) {
+ var usedCapacity int64
+ var currentRequest *request
for {
- var requests chan struct{}
- if curExecs < maxExecs && curMemory < maxMem {
+ var requests chan request
+ if currentRequest == nil {
+ // If we don't already have a queued request, then we should check for a new request
requests = r.requests
}
select {
- case requests <- struct{}{}:
- curExecs++
- case amount := <-r.finished:
- curExecs--
- curMemory += int64(amount)
- if curExecs < 0 {
- panic("curExecs < 0")
+ case newRequest := <-requests:
+ currentRequest = &newRequest
+ case amountCompleted := <-r.completions:
+ usedCapacity -= amountCompleted
+
+ if usedCapacity < 0 {
+ panic(fmt.Sprintf("usedCapacity < 0: %v (decreased by %v)", usedCapacity, amountCompleted))
}
- case amount := <-r.released:
- curMemory -= int64(amount)
case <-r.stop:
return
}
+
+ if currentRequest != nil {
+ accepted := false
+ if usedCapacity == 0 {
+ accepted = true
+ } else {
+ if capacity >= usedCapacity+currentRequest.size {
+ accepted = true
+ }
+ }
+ if accepted {
+ usedCapacity += currentRequest.size
+ currentRequest.serviced <- struct{}{}
+ currentRequest = nil
+ }
+ }
}
}
+
+// A CPURateLimiter limits the number of active calls based on CPU requirements
+type CPURateLimiter struct {
+ impl *RateLimit
+}
+
+func NewCPURateLimiter(capacity int64) *CPURateLimiter {
+ if capacity <= 0 {
+ capacity = int64(runtime.NumCPU())
+ }
+ impl := NewRateLimit(capacity)
+ return &CPURateLimiter{impl: impl}
+}
+
+func (e CPURateLimiter) Request() {
+ e.impl.Request(1)
+}
+
+func (e CPURateLimiter) Finish() {
+ e.impl.Finish(1)
+}
+
+func (e CPURateLimiter) Stop() {
+ e.impl.Stop()
+}
+
+// A MemoryRateLimiter limits the number of active calls based on Memory requirements
+type MemoryRateLimiter struct {
+ *RateLimit
+}
+
+func NewMemoryRateLimiter(capacity int64) *MemoryRateLimiter {
+ if capacity <= 0 {
+ capacity = 512 * 1024 * 1024 // 512MB
+ }
+ impl := NewRateLimit(capacity)
+ return &MemoryRateLimiter{RateLimit: impl}
+}