0%

go进行类似函数式/链式的数据map/reduce处理思考

一直在思考在go中怎么进行函数式那样的编程, 以实现类似Flink中的那种流数据处理. 后来想了想, 这其实也是如何实现map/reduce. 于是我有了如下的一些代码实验:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
type fn func(d interface{}) (interface{}, error)

type F interface {
Do(f fn) F
}

type Result struct {
Data interface{}
Err error
}

func (r *Result) Do(f fn) *Result {
if r.Err != nil {
return r
}
r.Data, r.Err = f(r.Data)
return r
}

func main() {
data := []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}
filter1 := func(i interface{}) (interface{}, error) {
if i.(int) > 1 {
return i, nil
}
return nil, fmt.Errorf("little than 1: %v", i)
}
filter2 := func(i interface{}) (interface{}, error) {
if i.(int) > 4 {
return i, nil
}
return nil, fmt.Errorf("little than 4: %v", i)
}
filter3 := func(i interface{}) (interface{}, error) {
if i.(int)&1 == 0 {
return i, nil
}
return nil, fmt.Errorf("even number: %v", i)
}
c := make(chan *Result, 10)
for _, d := range data {
go func(d interface{}) {
r := Result{
Data: d,
Err: nil,
}
c <- r.Do(filter1).Do(filter2).Do(filter3)
}(d)
}
for i := 0; i < 10; i++ {
d := <-c
if d.Err != nil {
fmt.Println(d.Err.Error())
} else {
fmt.Println(d.Data.(int))
}
}
}

// $ go run test.go
// little than 4: 2
// 6
// even number: 7
// 8
// little than 4: 3
// little than 1: 0
// even number: 9
// little than 4: 4
// little than 1: 1
// even number: 5

似乎可以做到, 但是数据类似的断言确实有点痛, 看来想彻底优雅的实现还需要泛型的, 估计等1.8或go2中泛型出来了, 会有很多这样的实现或代码库出现.

不过r.Do(filter1).Do(filter2).Do(filter3)这里的实现并没有做到在遇到错误时及时短路, 想想&&||可以做到及时短路, 于是就有了如下实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
type fn func(d interface{}) (interface{}, error)

type F interface {
Do(f fn) bool
}

type Result struct {
Data interface{}
Err error
}

func (r *Result) Do(f fn) bool {
if r.Err != nil {
return false
}
r.Data, r.Err = f(r.Data)
return true
}

func main() {
data := []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}
filter1 := func(i interface{}) (interface{}, error) {
if i.(int) > 1 {
return i, nil
}
return nil, fmt.Errorf("little than 1: %v", i)
}
filter2 := func(i interface{}) (interface{}, error) {
if i.(int) > 4 {
return i, nil
}
return nil, fmt.Errorf("little than 4: %v", i)
}
filter3 := func(i interface{}) (interface{}, error) {
if i.(int)&1 == 0 {
return i, nil
}
return nil, fmt.Errorf("even number: %v", i)
}
c := make(chan *Result, 10)
for _, d := range data {
go func(d interface{}) {
r := Result{
Data: d,
Err: nil,
}
_ = r.Do(filter1) && r.Do(filter2) && r.Do(filter3)
c <- &r
}(d)
}
for i := 0; i < 10; i++ {
d := <-c
if d.Err != nil {
fmt.Println(d.Err.Error())
} else {
fmt.Println(d.Data.(int))
}
}
}

// $ go run test.go
// 6
// even number: 7
// 8
// little than 1: 0
// little than 4: 3
// little than 4: 2
// little than 4: 4
// even number: 9
// little than 1: 1
// even number: 5

从这个demo中可以看出, 错误的处理确实是一个痛点, 如果能像Rust那样处理就很爽了, 所以我又搞了如下版本, 只返回一个数据, 不返回error, 而在reduce中判断switch判断是否遇到了error:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
type fn func(d interface{}) interface{}

type F interface {
Do(f fn) bool
}

type Result struct {
D interface{}
}

func (r *Result) Do(f fn) bool {
if e, ok := r.D.(error); ok && e != nil {
return false
}
r.D = f(r.D)
return true
}

func main() {
data := []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}
filter1 := func(i interface{}) interface{} {
if i.(int) > 1 {
return i
}
return fmt.Errorf("little than 1: %v", i)
}
filter2 := func(i interface{}) interface{} {
if i.(int) > 4 {
return i
}
return fmt.Errorf("little than 4: %v", i)
}
filter3 := func(i interface{}) interface{} {
if i.(int)&1 == 0 {
return i
}
return fmt.Errorf("even number: %v", i)
}
c := make(chan *Result, 10)
for _, d := range data {
go func(d interface{}) {
r := Result{
D: d,
}
_ = r.Do(filter1) && r.Do(filter2) && r.Do(filter3)
c <- &r
}(d)
}
for i := 0; i < 10; i++ {
d := <-c
switch d.D.(type) {
case error:
fmt.Println("err:",d.D)
default:
fmt.Println("num:",d.D)
}
}
}
// $ go run test.go
// err: even number: 9
// err: even number: 5
// num: 6
// err: even number: 7
// num: 8
// err: little than 4: 2
// err: little than 4: 3
// err: little than 1: 0
// err: little than 4: 4
// err: little than 1: 1

似乎有那么点Rust的味道了, 也发现如果泛型出来, filter的函数的map/reduce写起来或许会爽一点