Golang实现读写缓冲池

所谓读写缓冲池的本质是生产者消费者模型的实际应用。目的是为了避免频繁分配和释放内存,复用最初new出来的固定数目缓冲slice,减少GC压力。

Channel+WaitGroup

Golang并发基本都是这两者搭配使用,通过for+select循环监听发布事件、订阅事件、取消事件。

AsyncReader

数据结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
// 接口核心方法Begin和Next, Begin用于写数据到blockBuf,Next从blockBuf读取出来数据
type AsyncReader interface {
Begin() error
Stop()
Next() ([]byte, error)
SetTimeoutMs(t uint32)
}

type asyncReader struct {
// 数据来源
reader io.Reader

// buffer配置信息
blockNum uint32
bufferBlockNum uint32 /* blockNum-1 */
blockSize uint32
firstBlockSize uint32
clientTimeout time.Duration
innerTimeout time.Duration

// buffer
blocks []blockBuf

// channal and event
// struct{}不消耗内存空间,一般用来做信号
chRecieve chan struct{} // 读reader数据写到buffer
chStop chan struct{}
chReadEvent chan event // 从buffer读数据
isEof bool
}

// 缓存block
type blockBuf struct {
data []byte
}

type event struct {
/* 在err != nil的时候,readBlockIdx,readLen,isEof的值都是undefined */
readBlockIdx uint32
readLen uint32
isEof bool
err error
}

核心函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
func NewAsyncReader(reader io.Reader, blockSize uint32, blockNum uint32, logId uint32) *asyncReader {
this := &asyncReader{}

/* asyncReader 有多个block作为异步读取数据的缓冲池,
* 当用户调用next读取数据时,会把底层内存块以slice的方式交给调用方读取,
* 所以需要多开辟1块block,避免正在缓冲的block同时被用户读取,产生竞态
* this.blockNum 是asyncReader里实际开辟的block数
* this.bufferBlockNum 是asyncReader 里用来异步缓冲的block数
*/
this.blockNum = blockNum + 1
this.bufferBlockNum = this.blockNum - 1

// 开辟缓存空间
this.blocks = make([]blockBuf, this.blockNum)
for i := uint32(0); i < this.blockNum; i++ {
this.blocks[i].data = nil
}

this.chRecieve = make(chan struct{}, this.bufferBlockNum)
this.chStop = make(chan struct{}, 1)
this.chReadEvent = make(chan event, this.blockNum)

// default:
this.firstBlockSize = this.blockSize
this.SetTimeoutMs(1000)
this.isEof = false
return this
}

/* 异步读取数据 */
func (this *asyncReader) Begin() error {

for i := uint32(0); i < this.bufferBlockNum; i++ {
this.chRecieve <- struct{}{}
}

go func() {
isFirstBlock := true
blockIdx := uint32(0)
for {
select {
case <-this.chStop: // 终止信号
return
case <-time.After(this.innerTimeout):
// 默认协程异步读数据等待客户端读毕的超时时间,是客户端超时的2倍
this.chReadEvent <- NewEvent(fmt.Errorf("asyncReader begin() timeout"))
return
case <-this.chRecieve: // 监听receive信号
}

toReadLen := this.blockSize

len, isEof, err := this.ReadOneBlock(blockIdx, toReadLen)
if err != nil {
this.chReadEvent <- NewEvent(err)
return
}
// 读取成功一个block,就发送一个read event到通道,后续Next函数会主动获取
if len > 0 {
this.chReadEvent <- event{blockIdx, len, isEof, nil}
blockIdx = (blockIdx + 1) % this.blockNum // 接着写下一个block
}
if isEof {
this.chReadEvent <- event{0, 0, true, nil}
break
}
}
}()
return nil
}

/* 读取toReadLen长度的数据,直到满足长度或者EOF */
/* 返回参数 readLen,isEof,error */
func (this *asyncReader) ReadOneBlock(blockIdx uint32, toReadLen uint32) (uint32, bool, error) {
var haveReadLen uint32
for haveReadLen = 0; haveReadLen < toReadLen; {
if this.blocks[blockIdx].data == nil {
this.blocks[blockIdx].data = make([]byte, this.blockSize)
}

readLen, err := this.reader.Read(this.blocks[blockIdx].data[haveReadLen:toReadLen])
if readLen > 0 {
haveReadLen += uint32(readLen)
}
if err == io.EOF {
return haveReadLen, true, nil
} else if err != nil {
log.Logger.Warn("[logid:%d] async reader readLen:%d, error:%s",
this.logId, haveReadLen, err)
return haveReadLen, true, err
}
}
return haveReadLen, false, nil
}

/* 1. asyncReader只支持一个协程读取数据
* 2. 返回的slice是asyncReader底层的buffer,数据有效期到下次调用next之前,
* 返回的slice不允许做写操作,如果有写需求,请copy
* 3. Next一般放在for循环中调用, 直到返回数据为nil
*/
func (this *asyncReader) Next() ([]byte, error) {
if this.isEof {
return nil, nil
}

select {
case <-time.After(this.clientTimeout):
return nil, TimeOutError
case event := <-this.chReadEvent:
if event.err != nil {
return nil, fmt.Errorf("asyncReaderAll.next error. %s", event.err)
}

if event.isEof {
this.isEof = true
if event.readLen > 0 {
return this.blocks[event.readBlockIdx].data[0:event.readLen], nil
} else {
return nil, nil
}
} else {
// 发送继续写buffer的信号,Begin函数里边的协程继续从reader读取下一个block写到缓存; 如果blockNum==bufferBlockNum,此处会出现竞态
this.chRecieve <- struct{}{}
return this.blocks[event.readBlockIdx].data[0:event.readLen], nil
}
}
}

BufferPool

读写缓冲池,Bodybufs是缓冲区,多个slice

数据结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
type BufferPool interface {
Write(start []byte, length int, err error) BosErrorCodeType
Read() ([]byte, int, error)
SendCanWriteSignal(signal int) BosErrorCodeType
}

type BufferPoolImp struct {
Bodybufs []BodyBuf
bufWraps chan BufWrap // 读写通道,buffer内容指针
writeSignals chan int // 决定是否可写
currentIndex int // 写入当前的Bodybufs[index]
maxBufferSize int
timeout int
needMallocMemory bool //pre malloc memory or not
}

type BodyBuf struct {
data []byte
}

type BufWrap struct {
index int //index of bodyBufs
n int //size has read from r.Body
err error
}

核心函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
func (bp *BufferPoolImp) Write(start []byte, length int, err error) BosErrorCodeType {
select {
case writeSignal := <-bp.writeSignals:
log.Logger.Debug("[logid:%d] read can write signal ok: %d", bp.logId, writeSignal)
if writeSignal != 0 {
return BosErrorCodeType(writeSignal)
}
case <-time.After(time.Duration(bp.timeout) * time.Second * 2):
log.Logger.Error("[logid:%d] wait writeSignal timeout", bp.logId)
return CODE_INTERNAL_ERROR
}

// start数据写入到BodyBufs, 并往bufWraps chan写内容指针,read函数会监听该通道
bp.Bodybufs[bp.currentIndex] = BodyBuf{start}
bufWrap := BufWrap{bp.currentIndex, length, err}

select {
case bp.bufWraps <- bufWrap:
log.Logger.Debug("[logid:%d] write buffers, index[%d]", bp.logId, bp.currentIndex)
bp.currentIndex = (bp.currentIndex + 1) % bp.maxBufferSize
case <-time.After(time.Duration(bp.timeout) * time.Second):
log.Logger.Error("[logid:%d] write buffers timeout", bp.logId)
return CODE_INTERNAL_ERROR
}

return CODE_OK
}

// read一般放在for循环中调用, 直到返回数据为nil
func (bp *BufferPoolImp) Read() ([]byte, int, error) {
select {
case bufWrap := <-bp.bufWraps: // 获取写的内容
log.Logger.Debug("[logid:%d] read buffers, index[%d], len[%d]",
bp.logId, bufWrap.index, bufWrap.n)
if bufWrap.index < 0 || bufWrap.index > bp.maxBufferSize-1 {
return nil, -1, bufWrap.err
}
return bp.Bodybufs[bufWrap.index].data, bufWrap.n, bufWrap.err
case <-time.After(time.Duration(bp.timeout) * time.Second):
log.Logger.Error("[logid:%d] read buffers timeout", bp.logId)
return nil, -1, ReadTimeOutError
}
}

// 调用read函数时需要同时调用这个函数,以确保write函数可以继续写buffer
// read之后再调用该函数,所以不像async_reader可能出现竞争状态
func (bp *BufferPoolImp) SendCanWriteSignal(signal int) BosErrorCodeType {
select {
case bp.writeSignals <- signal:
log.Logger.Debug("[logid:%d] send can write signal ok", bp.logId)
case <-time.After(time.Duration(bp.timeout) * time.Second * 2):
log.Logger.Error("[logid:%d] write writeSignal timeout", bp.logId)
return CODE_INTERNAL_ERROR
}
return CODE_OK
}