blob: 0ea2ef084d1e741f291f1a772246a77c3439e0bd [file] [log] [blame]
Dan Willemsen017d8932016-08-04 15:43:03 -07001// Copyright 2016 Google Inc. All rights reserved.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
Jeff Gaston11b5c512017-10-12 12:19:14 -070015package zip
Dan Willemsen017d8932016-08-04 15:43:03 -070016
17import (
Jeff Gaston175f34c2017-08-17 21:43:21 -070018 "fmt"
Dan Willemsen017d8932016-08-04 15:43:03 -070019 "runtime"
20)
21
22type RateLimit struct {
Jeff Gaston175f34c2017-08-17 21:43:21 -070023 requests chan request
24 completions chan int64
25
26 stop chan struct{}
Dan Willemsen017d8932016-08-04 15:43:03 -070027}
28
Jeff Gaston175f34c2017-08-17 21:43:21 -070029type request struct {
30 size int64
31 serviced chan struct{}
32}
Dan Willemsen017d8932016-08-04 15:43:03 -070033
Jeff Gaston175f34c2017-08-17 21:43:21 -070034// NewRateLimit starts a new rate limiter that permits the usage of up to <capacity> at once,
35// except when no capacity is in use, in which case the first caller is always permitted
36func NewRateLimit(capacity int64) *RateLimit {
Colin Cross526416b2017-08-17 23:54:51 +000037 ret := &RateLimit{
Jeff Gaston175f34c2017-08-17 21:43:21 -070038 requests: make(chan request),
39 completions: make(chan int64),
Colin Cross526416b2017-08-17 23:54:51 +000040
Jeff Gaston175f34c2017-08-17 21:43:21 -070041 stop: make(chan struct{}),
Colin Cross526416b2017-08-17 23:54:51 +000042 }
43
Jeff Gaston175f34c2017-08-17 21:43:21 -070044 go ret.monitorChannels(capacity)
Dan Willemsen017d8932016-08-04 15:43:03 -070045
46 return ret
47}
48
Jeff Gaston175f34c2017-08-17 21:43:21 -070049// RequestExecution blocks until another execution of size <size> can be allowed to run.
50func (r *RateLimit) Request(size int64) {
51 request := request{
52 size: size,
53 serviced: make(chan struct{}, 1),
54 }
55
56 // wait for the request to be received
57 r.requests <- request
58
59 // wait for the request to be accepted
60 <-request.serviced
Dan Willemsen017d8932016-08-04 15:43:03 -070061}
62
Jeff Gaston175f34c2017-08-17 21:43:21 -070063// Finish declares the completion of an execution of size <size>
64func (r *RateLimit) Finish(size int64) {
65 r.completions <- size
Dan Willemsen017d8932016-08-04 15:43:03 -070066}
67
68// Stop the background goroutine
69func (r *RateLimit) Stop() {
70 close(r.stop)
71}
72
Jeff Gaston175f34c2017-08-17 21:43:21 -070073// monitorChannels processes incoming requests from channels
74func (r *RateLimit) monitorChannels(capacity int64) {
75 var usedCapacity int64
76 var currentRequest *request
Dan Willemsen017d8932016-08-04 15:43:03 -070077
78 for {
Jeff Gaston175f34c2017-08-17 21:43:21 -070079 var requests chan request
80 if currentRequest == nil {
81 // If we don't already have a queued request, then we should check for a new request
Dan Willemsen017d8932016-08-04 15:43:03 -070082 requests = r.requests
83 }
84
85 select {
Jeff Gaston175f34c2017-08-17 21:43:21 -070086 case newRequest := <-requests:
87 currentRequest = &newRequest
88 case amountCompleted := <-r.completions:
89 usedCapacity -= amountCompleted
90
91 if usedCapacity < 0 {
92 panic(fmt.Sprintf("usedCapacity < 0: %v (decreased by %v)", usedCapacity, amountCompleted))
Dan Willemsen017d8932016-08-04 15:43:03 -070093 }
Dan Willemsen017d8932016-08-04 15:43:03 -070094 case <-r.stop:
95 return
96 }
Jeff Gaston175f34c2017-08-17 21:43:21 -070097
98 if currentRequest != nil {
99 accepted := false
100 if usedCapacity == 0 {
101 accepted = true
102 } else {
103 if capacity >= usedCapacity+currentRequest.size {
104 accepted = true
105 }
106 }
107 if accepted {
108 usedCapacity += currentRequest.size
109 currentRequest.serviced <- struct{}{}
110 currentRequest = nil
111 }
112 }
Dan Willemsen017d8932016-08-04 15:43:03 -0700113 }
114}
Jeff Gaston175f34c2017-08-17 21:43:21 -0700115
116// A CPURateLimiter limits the number of active calls based on CPU requirements
117type CPURateLimiter struct {
118 impl *RateLimit
119}
120
121func NewCPURateLimiter(capacity int64) *CPURateLimiter {
122 if capacity <= 0 {
123 capacity = int64(runtime.NumCPU())
124 }
125 impl := NewRateLimit(capacity)
126 return &CPURateLimiter{impl: impl}
127}
128
129func (e CPURateLimiter) Request() {
130 e.impl.Request(1)
131}
132
133func (e CPURateLimiter) Finish() {
134 e.impl.Finish(1)
135}
136
137func (e CPURateLimiter) Stop() {
138 e.impl.Stop()
139}
140
141// A MemoryRateLimiter limits the number of active calls based on Memory requirements
142type MemoryRateLimiter struct {
143 *RateLimit
144}
145
146func NewMemoryRateLimiter(capacity int64) *MemoryRateLimiter {
147 if capacity <= 0 {
148 capacity = 512 * 1024 * 1024 // 512MB
149 }
150 impl := NewRateLimit(capacity)
151 return &MemoryRateLimiter{RateLimit: impl}
152}