Что есть и что нужно
С начала разберемся что нам нужно:
— отправка GET/POST/PUT/DELETE запросов
— перебор URL, и POST body
— контроль над открытыми соединениями
— контроль над потоками
— указание продолжительности тестирования
— ограничения по максимальному количеству запросов в секунду
— возможность исключить несколько первых секунд из статистики, чтобы избежать искажения в момент прогрева HTTP сервера
План
— пул соединений
— простые Request/Response
— статистика
— profit
Раз нужно контролировать соединения, стандартный http.Client нам не подходит (да и большой он для такой задачи), умеет слишком много из-за чего страдает производительность. Так как у нас подразумевается несколько потоков воркеров для отправки запросов, то нам нужен пул соединений, которые они будут между собой делить. Воркеру ждать ответа от сервера не имеет смысла, мы просто теряем на это драгоценное время. Как оценить проходящий трафик? Стандартные http.Request, http.Respose такой информации не дают, использовать их не получится, значит нужно реализовывать простой Request/Response, который нам все неоходимое даст. Собирать сырые данные и в конце их агрегировать не получится, так как память не резиновая. Собираем стату на лету.
Поехали
Пул соединений пишем на основе ограниченного канала. Выглядеть он будет как простой пул объектов, взяли объект из канала, поработали, положили обратно.
type Connection struct {
conn net.Conn
manager *ConnectionManager
}
type ConnectionManager struct {
conns chan *Connection
config *Config
}
func NewConnectionManager(config *Config) (result *ConnectionManager) {
result = &ConnectionManager{config: config, conns: make(chan *Connection, config.Connections)}
for i := 0; i < config.Connections; i++ {
connection := &Connection{manager: result}
if connection.Dial() != nil {
ConnectionErrors++
}
result.conns <- connection
}
return
}
func (this *ConnectionManager) Get() *Connection {
return <-this.conns
}
func (this *Connection) Dial() error {
if this.IsConnected() {
this.Disconnect()
}
conn, err := net.Dial("tcp4", this.manager.config.Url.Host)
if err == nil {
this.conn = conn
}
return err
}
func (this *Connection) Disconnect() {
this.conn.Close()
this.conn = nil
}
func (this *Connection) IsConnected() bool {
return this.conn != nil
}
func (this *Connection) Return() {
this.manager.conns <- this
}
Request/Response тут можно почитать исходники Go, посмотреть как реализовано там, и сделать упрощенную аналогию, главным отличием будет возможность получить объем трафика каждого запроса/ответа и сэкономить драгоценное время.
type Request struct {
Method string
URL *url.URL
Header map[string][]string
Body io.Reader
ContentLength int64
Host string
BufferSize int64
}
func (req *Request) Write(w io.Writer) error {
bw := &bytes.Buffer{}
fmt.Fprintf(bw, "%s %s HTTP/1.1\r\n", valueOrDefault(req.Method, "GET"), req.URL.RequestURI())
fmt.Fprintf(bw, "Host: %s\r\n", req.Host)
userAgent := ""
if req.Header != nil {
if ua := req.Header["User-Agent"]; len(ua) > 0 {
userAgent = ua[0]
}
}
if userAgent != "" {
fmt.Fprintf(bw, "User-Agent: %s\r\n", userAgent)
}
if req.Method == "POST" || req.Method == "PUT" {
fmt.Fprintf(bw, "Content-Length: %d\r\n", req.ContentLength)
}
if req.Header != nil {
for key, values := range req.Header {
if key == "User-Agent" || key == "Content-Length" || key == "Host" {
continue
}
for _, value := range values {
fmt.Fprintf(bw, "%s: %s\r\n", key, value)
}
}
}
io.WriteString(bw, "\r\n")
if req.Method == "POST" || req.Method == "PUT" {
bodyReader := bufio.NewReader(req.Body)
_, err := bodyReader.WriteTo(bw)
if err != nil {
return err
}
}
req.BufferSize = int64(bw.Len())
_, err := bw.WriteTo(w)
return err
}
type Response struct {
Status string
StatusCode int
Header map[string][]string
ContentLength int64
BufferSize int64
}
func ReadResponse(r *bufio.Reader) (*Response, error) {
tp := textproto.NewReader(r)
resp := &Response{}
line, err := tp.ReadLine()
if err != nil {
return nil, err
}
f := strings.SplitN(line, " ", 3)
resp.BufferSize += int64(len(f) + 2)
if len(f) < 2 {
return nil, errors.New("Response Header ERROR")
}
reasonPhrase := ""
if len(f) > 2 {
reasonPhrase = f[2]
}
resp.Status = f[1] + " " + reasonPhrase
resp.StatusCode, err = strconv.Atoi(f[1])
if err != nil {
return nil, errors.New("malformed HTTP status code")
}
resp.Header = make(map[string][]string)
for {
line, err := tp.ReadLine()
if err != nil {
return nil, errors.New("Response Header ERROR")
}
resp.BufferSize += int64(len(line) + 2)
if len(line) == 0 {
break
} else {
f := strings.SplitN(line, ":", 2)
resp.Header[f[0]] = append(resp.Header[strings.TrimSpace(f[0])], strings.TrimSpace(f[1]))
}
}
if cl := resp.Header["Content-Length"]; len(cl) > 0 {
i, err := strconv.ParseInt(cl[0], 10, 0)
if err == nil {
resp.ContentLength = i
}
}
buff := make([]byte, resp.ContentLength)
r.Read(buff)
resp.BufferSize += int64(resp.ContentLength)
return resp, nil
}
Для того что бы наши потоки выключились, когда время тестирования закончится, сделаем канал для завершения работы потоков и канал, по которому каждый поток будет сообщать, что он корректно завершил свою работу
WorkerQuit := make(chan bool, *_threads)
WorkerQuited := make(chan bool, *_threads)
засечем время, и также будем ждать Ctr+C(SIGTERM), чтобы наше приложение могло завершить тестирование в любой момент
//Start Ctr+C listen
signalChan := make(chan os.Signal, 1)
signal.Notify(signalChan, os.Interrupt, syscall.SIGTERM)
//Wait timers or SIGTERM
select {
case <-time.After(config.Duration):
case <-signalChan:
}
for i := 0; i < config.Threads; i++ {
config.WorkerQuit <- true
}
//Wait for threads complete
for i := 0; i < config.Threads; i++ {
<-config.WorkerQuited
}
Теперь посмотрим на сам воркер: для ограничения по количеству запросов в секунду возьмем для каждого его долю от общего количества, 4 раза в секунду будем увеличивать счетчик и ждать либо освободившиеся соединение либо завершение работы
func NewThread(config *Config) {
timerAllow := time.NewTicker(time.Duration(250) * time.Millisecond)
allow := int32(config.MRQ / 4 / config.Threads)
if config.MRQ == -1 {
allow = 2147483647
} else if allow <= 0 {
allow = 1
}
var connectionErrors int32 = 0
currentAllow := allow
for {
select {
//По таймеру выставляем счетчик на количество разрешенных запросов
case <-timerAllow.C:
currentAllow = allow
//Получаем свободное соединение
case connection := <-config.ConnectionManager.conns:
currentAllow--
//Если разрешенные запросы кончились - возвращаем соединение в пул
if currentAllow < 0 {
connection.Return()
} else {
//Формируем запрос
req := getRequest(config.Method, config.Url, config.Source.GetNext())
//Если нужно переподключаться на каждом запросе
if config.Reconnect && connection.IsConnected() {
connection.Disconnect()
}
//Если соединение разорвано, то пробуем его восстановить
if !connection.IsConnected() {
if connection.Dial() != nil {
connectionErrors++
}
}
//Отправляем запрос если есть соединение, иначе возвращаем соединение
if connection.IsConnected() {
go writeSocket(connection, req, config.RequestStats)
} else {
connection.Return()
}
}
//Ждем завершения
case <-config.WorkerQuit:
//Записываем ошибки по соединениям
atomic.AddInt32(&ConnectionErrors, connectionErrors)
//Подтверждаем завершение
config.WorkerQuited <- true
return
}
}
}
Как только соединение освободится, формируем следующий запрос и запускаем асинхронно отправку его, так по кругу пока не кончится время. После того как запрос отправлен, а ответ прочитан, соединение возвращаем в пул, и поток снова подхватит его.
func writeSocket(connection *Connection, req *http.Request, read chan *RequestStats) {
result := &RequestStats{}
//По окончанию обязательно отправляем статус и отдаем соединение в пул
defer func() {
connection.Return()
read <- result
}()
now := time.Now()
conn := connection.conn
bw := bufio.NewWriter(conn)
//Пишем запрос
err := req.Write(bw)
if err != nil {
result.WriteError = err
return
}
err = bw.Flush()
if err != nil {
result.WriteError = err
return
}
//Ждем ответа
res, err := http.ReadResponse(bufio.NewReader(conn))
if err != nil {
result.ReadError = err
return
}
//Собираем нужную информацию
result.Duration = time.Now().Sub(now)
result.NetOut = req.BufferSize
result.NetIn = res.BufferSize
result.ResponseCode = res.StatusCode
req.Body = nil
}
Осталось дело за малым, собрать статистику из объектов RequestStats и оформить ее
//Вся статистика
type StatsSource struct {
Readed int64
Writed int64
Requests int
Skiped int
Min time.Duration
Max time.Duration
Sum int64
Codes map[int]int
DurationPercent map[time.Duration]int
ReadErrors int
WriteErrors int
Work time.Duration
}
//Статистика для посекундных отчетов
type StatsSourcePerSecond struct {
Readed int64
Writed int64
Requests int
Skiped int
Sum int64
}
//Агрегатор статистики
func StartStatsAggregator(config *Config) {
allowStore := true
allowStoreTime := time.After(config.ExcludeSeconds)
if config.ExcludeSeconds.Seconds() > 0 {
allowStore = false
}
verboseTimer := time.NewTicker(time.Duration(1) * time.Second)
if config.Verbose {
fmt.Printf("%s %s %s %s %s %s\n",
newSpancesFormatRightf("Second", 10, "%s"),
newSpancesFormatRightf("Total", 10, "%s"),
newSpancesFormatRightf("Req/sec", 10, "%s"),
newSpancesFormatRightf("Avg/sec", 10, "%s"),
newSpancesFormatRightf("In/sec", 10, "%s"),
newSpancesFormatRightf("Out/sec", 10, "%s"),
)
} else {
verboseTimer.Stop()
}
source = StatsSource{
Codes: make(map[int]int),
DurationPercent: make(map[time.Duration]int),
}
perSecond := StatsSourcePerSecond{}
start := time.Now()
for {
select {
//Таймер для посекундных отчетов
case <-verboseTimer.C:
if perSecond.Requests-perSecond.Skiped > 0 && config.Verbose {
//Считаем среднее время ответа
avgMilliseconds := perSecond.Sum / int64(perSecond.Requests-perSecond.Skiped)
avg := time.Duration(avgMilliseconds) * time.Millisecond
//Пишем статистику
fmt.Printf("%s %s %s %s %s %s\n",
newSpancesFormatRightf(roundToSecondDuration(time.Now().Sub(start)), 10, "%v"),
newSpancesFormatRightf(source.Requests, 10, "%d"),
newSpancesFormatRightf(perSecond.Requests, 10, "%d"),
newSpancesFormatRightf(avg, 10, "%v"),
newSpancesFormatRightf(Bites(perSecond.Readed), 10, "%s"),
newSpancesFormatRightf(Bites(perSecond.Writed), 10, "%s"),
)
}
//Сбрасываем данные
perSecond = StatsSourcePerSecond{}
//Таймер для разрешения сбора статистики нужен для пропуска на старте
case <-allowStoreTime:
allowStore = true
//Получаем ответ от сервера
case res := <-config.RequestStats:
//Если были ошибки - просто их записываем, остальная информация нам не интересна
if res.ReadError != nil {
source.ReadErrors++
continue
} else if res.WriteError != nil {
source.WriteErrors++
continue
}
//Инкрементируем счетчики
source.Requests++
perSecond.Requests++
perSecond.Readed += res.NetIn
perSecond.Writed += res.NetOut
source.Readed += res.NetIn
source.Writed += res.NetOut
//Собираем статистику по запросам в разрезе HTTP кодов
source.Codes[res.ResponseCode]++
if !allowStore {
perSecond.Skiped++
source.Skiped++
continue
}
//Для среднего времени ответа
sum := int64(res.Duration.Seconds() * 1000)
source.Sum += sum
perSecond.Sum += sum
//Максимальное и минимальное время ответа
if source.Min > res.Duration {
source.Min = roundDuration(res.Duration)
}
if source.Max < res.Duration {
source.Max = roundDuration(res.Duration)
}
//Количество запросов в разрезе времени ответа округленная до 10 миллисекунд
duration := time.Duration(res.Duration.Nanoseconds()/10000000) * time.Millisecond * 10
source.DurationPercent[duration]++
//Завершение сбора статистики
case <-config.StatsQuit:
//Записываем общее время теста
source.Work = time.Duration(time.Now().Sub(start).Seconds()*1000) * time.Millisecond
if config.Verbose {
s := ""
for {
if len(s) >= 61 {
break
}
s += "-"
}
fmt.Println(s)
}
//Подтверждаем завершение
config.StatsQuit <- true
return
}
}
}
Подводим итоги
Как парсить аргументы запуска и форматировать вывод статистики я опущу, так как это не интересно. А теперь давайте проверим, что у нас получилось. Для пробы натравим wrk на Node.js кластер
% ./wrk -c 21 -t 7 -d 30s -L http://localhost:3001/index.html
Running 30s test @ http://localhost:3001/index.html
7 threads and 21 connections
Thread Stats Avg Stdev Max +/- Stdev
Latency 1.09ms 6.55ms 152.07ms 99.63%
Req/Sec 5.20k 3.08k 14.33k 58.75%
Latency Distribution
50% 490.00us
75% 0.89ms
90% 1.83ms
99% 5.04ms
1031636 requests in 30.00s, 153.48MB read
Requests/sec: 34388.25
Transfer/sec: 5.12MB
и тоже самое на go с GOMAXPROCS=1
% ./go-meter -t 7 -c 21 -d 30s -u http://localhost:3001/index.html
Running test threads: 7, connections: 21 in 30s GET http://localhost:3001/index.html
Stats: Min Avg Max
Latency 0 0 83ms
843183 requests in 30s, net: in 103MB, out 62MB
HTTP Codes:
200 100.00%
Latency:
0 99.99%
10ms - 80ms 0.01%
Requests: 28106.10/sec
Net In: 27MBit/sec
Net Out: 17MBit/sec
Transfer: 5.5MB/sec
Получаем 28106 против 34388 запросов в секунду — это примерно на 20% меньше, по сравнению с чистым Cи + event loop + nio. Довольно неплохо, при изменении GOMAXPROCS разницы практически нет, так как большую часть процессорного времени отбирает Node.js.
Минусы:
— потеря 20% производительности, можно попробовать упростить Request/Response, может дать немного производительности
— еще нет поддержи HTTPS
— еще нельзя указать пользовательские HTTP заголовки и timeout
Все исходники тут — Github
Как пользоваться
% go get github.com/a696385/go-meter
% $GOPATH/bin/go-meter -h
Спасибо за внимание!
This entry passed through the Full-Text RSS service — if this is your content and you're reading it on someone else's site, please read the FAQ at fivefilters.org/content-only/faq.php#publishers.
Комментариев нет:
Отправить комментарий