-
Notifications
You must be signed in to change notification settings - Fork 38
/
limiter.go
139 lines (115 loc) · 2.72 KB
/
limiter.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
package goreplay
import (
"fmt"
"io"
"math/rand"
"strconv"
"strings"
"time"
)
// Limiter is a wrapper for input or output plugin which adds rate limiting
type Limiter struct {
plugin interface{}
limit int
isPercent bool
currentRPS int
currentTime int64
}
func parseLimitOptions(options string) (limit int, isPercent bool) {
if n := strings.Index(options, "%"); n > 0 {
limit, _ = strconv.Atoi(options[:n])
isPercent = true
} else {
limit, _ = strconv.Atoi(options)
isPercent = false
}
return
}
func newLimiterExceptions(l *Limiter) {
if !l.isPercent {
return
}
speedFactor := float64(l.limit) / float64(100)
// FileInput、KafkaInput have its own rate limiting. Unlike other inputs we not just dropping requests, we can slow down or speed up request emittion.
switch input := l.plugin.(type) {
case *FileInput:
input.speedFactor = speedFactor
case *KafkaInput:
input.speedFactor = speedFactor
}
}
// NewLimiter constructor for Limiter, accepts plugin and options
// `options` allow to sprcify relatve or absolute limiting
func NewLimiter(plugin interface{}, options string) PluginReadWriter {
l := new(Limiter)
l.limit, l.isPercent = parseLimitOptions(options)
l.plugin = plugin
l.currentTime = time.Now().UnixNano()
newLimiterExceptions(l)
return l
}
func (l *Limiter) isLimitedExceptions() bool {
if !l.isPercent {
return false
}
// Fileinput、Kafkainput have its own limiting algorithm
switch l.plugin.(type) {
case *FileInput:
return true
case *KafkaInput:
return true
default:
return false
}
}
func (l *Limiter) isLimited() bool {
if l.isLimitedExceptions() {
return false
}
if l.isPercent {
return l.limit <= rand.Intn(100)
}
if (time.Now().UnixNano() - l.currentTime) > time.Second.Nanoseconds() {
l.currentTime = time.Now().UnixNano()
l.currentRPS = 0
}
if l.currentRPS >= l.limit {
return true
}
l.currentRPS++
return false
}
// PluginWrite writes message to this plugin
func (l *Limiter) PluginWrite(msg *Message) (n int, err error) {
if l.isLimited() {
return 0, nil
}
if w, ok := l.plugin.(PluginWriter); ok {
return w.PluginWrite(msg)
}
// avoid further writing
return 0, io.ErrClosedPipe
}
// PluginRead reads message from this plugin
func (l *Limiter) PluginRead() (msg *Message, err error) {
if r, ok := l.plugin.(PluginReader); ok {
msg, err = r.PluginRead()
} else {
// avoid further reading
return nil, io.ErrClosedPipe
}
if l.isLimited() {
return nil, nil
}
return
}
func (l *Limiter) String() string {
return fmt.Sprintf("Limiting %s to: %d (isPercent: %v)", l.plugin, l.limit, l.isPercent)
}
// Close closes the resources.
func (l *Limiter) Close() error {
if fi, ok := l.plugin.(io.Closer); ok {
fi.Close()
}
return nil
}