GoStream实现BufferChan

上一篇文章中,我实现了Go BufferChan。但BufferChan作为一个独立库,实际使用上并不方便,所以想着把BufferChan整合进GoStream中。

原理是,将上一篇文章的实现原理封闭在在GoStream的BufferChan()中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
/// chan.go

// BufferChan 对channel运行缓存
// 当接收消息达到`size`或超过`timeout`未收到新消息时,发送消息
// 参数说明
// typ slice类型参数
// size 缓存数量
// timeout 超时时间
func (s Stream) BufferChan(typ interface{}, size int, timeout time.Duration) Stream {
t := reflect.TypeOf(typ)
if t.Kind() != reflect.Slice {
panic("typ should be slice")
}
if size <= 0 {
panic("size should gt 0")
}
if timeout <= 0 {
panic("timeout should gt 0")
}

in := make(chan interface{})
out := make(chan interface{})
go s.OutChan(in)

go func() {
sv := reflect.MakeSlice(t, size, size)
idx := 0

var brush = func() {
out <- sv.Slice(0, idx).Interface()
sv = reflect.MakeSlice(t, size, size)
idx = 0
}

for {
select {
case v, ok := <-in:
if ok {
sv.Index(idx).Set(reflect.ValueOf(v))
idx++
if idx == size {
brush()
}
} else {
if idx > 0 {
brush()
}
close(out)
return
}
case <-time.After(timeout):
if idx > 0 {
brush()
}
}
}
}()

return From(out)
}

加点测试验证下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
/// chan_test.go

func TestBufferChanBySize(t *testing.T) {
in := make(chan int)
go func() {
for i := 0; i < 5; i++ {
in <- i
}
close(in)
}()

want := [][]int{{0, 1, 2}, {3, 4}}
got := From(in).BufferChan([]int{}, 3, time.Second).Collect(ToSlice([][]int{}))
assert.Equal(t, want, got)
}

func TestBufferChanByTimeout(t *testing.T) {
in := make(chan int)
go func() {
in <- 0
time.Sleep(time.Millisecond * 500)
in <- 1
in <- 2
time.Sleep(time.Millisecond * 500)
in <- 3
in <- 4
close(in)
}()
want := [][]int{{0}, {1, 2}, {3, 4}}
got := From(in).BufferChan([]int{}, 100, time.Millisecond*300).Collect(ToSlice([][]int{}))
assert.Equal(t, want, got)
}


GoStream实现BufferChan
https://a3d21.github.io/2022/01/28/2022-01-28-gostream-buffer-channel/
作者
a3d21
发布于
2022年1月28日
许可协议