注册

看了Kubernetes 源码后,我的Go水平突飞猛进

接口方式隐藏传入参数的细节


当方法的入参是一个结构体的时候,内部去调用时会看到入参过多的细节,这个时候可以将入参隐式转成结构,让内部只看到需要的方法即可。


type Kubelet struct{}

func (kl *Kubelet) HandlePodAdditions(pods []*Pod) {
for _, pod := range pods {
fmt.Printf("create pods : %s\n", pod.Status)
}
}

func (kl *Kubelet) Run(updates <-chan Pod) {
fmt.Println(" run kubelet")
go kl.syncLoop(updates, kl)
}

func (kl *Kubelet) syncLoop(updates <-chan Pod, handler SyncHandler) {
for {
select {
case pod := <-updates:
handler.HandlePodAdditions([]*Pod{&pod})
}
}
}

type SyncHandler interface {
HandlePodAdditions(pods []*Pod)
}

这里我们可以看到 Kubelet 本身有比较多的方法:



  • syncLoop 同步状态的循环
  • Run 用来启动监听循环
  • HandlePodAdditions 处理Pod增加的逻辑

由于 syncLoop 其实并不需要知道 kubelet 上其他的方法,所以通过 SyncHandler 接口的定义,让 kubelet 实现该接口后,外面作为参数传入给 syncLoop ,它就会将类型转换为 SyncHandler


经过转换后 kubelet 上其他的方法在入参里面就看不到了,编码时就可以更加专注在 syncLoop 本身逻辑的编写。


但是这样做同样会带来一些问题,第一次研发的需求肯定是能满足我们的抽象,但是随着需求的增加和迭代,我们在内部需要使用 kubelet 其他未封装成接口的方法时,我们就需要额外传入 kubelet 或者是增加接口的封装,这都会增加我们的编码工作,也破坏了我们最开始的封装。


分层隐藏设计是我们设计的最终目的,在代码设计的过程中让一个局部关注到它需要关注的东西即可。


接口封装方便Mock测试


通过接口的抽象,我们在测试的时候可以把不关注的内容直接实例化成一个 Mock 的结构。


type OrderAPI interface {
GetOrderId() string
}

type realOrderImpl struct{}

func (r *realOrderImpl) GetOrderId() string {
return ""
}

type mockOrderImpl struct{}

func (m *mockOrderImpl) GetOrderId() string {
return "mock"
}

这里如果测试的时候不需要关注 GetOrderId 的方法是否正确,则直接用 mockOrderImpl 初始化 OrderAPI 即可,mock的逻辑也可以进行复杂编码



func TestGetOrderId(t *testing.T) {
orderAPI := &mockOrderImpl{} // 如果要获取订单id,且不是测试的重点,这里直接初始化成mock的结构体
fmt.Println(orderAPI.GetOrderId())
}

gomonkey 也同样能进行测试注入,所以如果以前的代码没能够通过接口封装也同样可以实现mock,而且这种方式更加强大


patches := gomonkey.ApplyFunc(GetOrder, func(orderId string) Order {
return Order{
OrderId: orderId,
OrderState: delivering,
}
})
return func() {
patches.Reset()
}

使用 gomonkey 能够更加灵活的进行 mock , 它能直接设置一个方法的返回值,而接口的抽象只能够处理结构体实例化出来的内容。


接口封装底层多种实现


iptables 、ipvs等的实现就是通过接口的抽象来实现,因为所有网络设置都需要处理 Service 和 Endpoint ,所以抽象了 ServiceHandlerEndpointSliceHandler


// ServiceHandler 是一个抽象接口,用于接收有关服务对象更改的通知。
type ServiceHandler interface {
// OnServiceAdd 在观察到创建新服务对象时调用。
OnServiceAdd(service *v1.Service)
// OnServiceUpdate 在观察到现有服务对象的修改时调用。
OnServiceUpdate(oldService, service *v1.Service)
// OnServiceDelete 在观察到现有服务对象的删除时调用。
OnServiceDelete(service *v1.Service)
// OnServiceSynced 一旦所有初始事件处理程序都被调用并且状态完全传播到本地缓存时调用。
OnServiceSynced()
}

// EndpointSliceHandler 是一个抽象接口,用于接收有关端点切片对象更改的通知。
type EndpointSliceHandler interface {
// OnEndpointSliceAdd 在观察到创建新的端点切片对象时调用。
OnEndpointSliceAdd(endpointSlice *discoveryv1.EndpointSlice)
// OnEndpointSliceUpdate 在观察到现有端点切片对象的修改时调用。
OnEndpointSliceUpdate(oldEndpointSlice, newEndpointSlice *discoveryv1.EndpointSlice)
// OnEndpointSliceDelete 在观察到现有端点切片对象的删除时调用。
OnEndpointSliceDelete(endpointSlice *discoveryv1.EndpointSlice)
// OnEndpointSlicesSynced 一旦所有初始事件处理程序都被调用并且状态完全传播到本地缓存时调用。
OnEndpointSlicesSynced()
}

然后通过 Provider 注入即可,


type Provider interface {
config.EndpointSliceHandler
config.ServiceHandler
}

这个也是我在做组件的时候用的最多的一种编码技巧,通过将类似的操作进行抽象,能够在替换底层实现后,上层代码不发生改变。


封装异常处理


我们开启协程之后如果不对异常进行捕获,则会导致协程出现异常后直接 panic ,但是每次写一个 recover 的逻辑做全局类似的处理未免不太优雅,所以通过封装 HandleCrash 方法来实现。


package runtime

var (
ReallyCrash = true
)

// 全局默认的Panic处理
var PanicHandlers = []func(interface{}){logPanic}

// 允许外部传入额外的异常处理
func HandleCrash(additionalHandlers ...func(interface{})) {
if r := recover(); r != nil {
for _, fn := range PanicHandlers {
fn(r)
}
for _, fn := range additionalHandlers {
fn(r)
}
if ReallyCrash {
panic(r)
}
}
}

这里既支持了内部异常的函数处理,也支持外部传入额外的异常处理,如果不想要 Crash 的话也可以自己进行修改。


package runtime

func Go(fn func()) {
go func() {
defer HandleCrash()
fn()
}()
}

要起协程的时候可以通过 Go 方法来执行,这样也避免了自己忘记增加 panic 的处理。


waitgroup的封装


import "sync"

type Gr0up struct {
wg sync.WaitGr0up
}

func (g *Gr0up) Wait() {
g.wg.Wait()
}

func (g *Gr0up) Start(f func()) {
g.wg.Add(1)
go func() {
defer g.wg.Done()
f()
}()
}

这里最主要的是 Start 方法,内部将 AddDone 进行了封装,虽然只有短短的几行代码,却能够让我们每次使用 waitgroup 的时候不会忘记去对计数器增加一和完成计数器。


信号量触发逻辑封装


type BoundedFrequencyRunner struct {
sync.Mutex

// 主动触发
run chan struct{}

// 定时器限制
timer *time.Timer

// 真正执行的逻辑
fn func()
}

func NewBoundedFrequencyRunner(fn func()) *BoundedFrequencyRunner {
return &BoundedFrequencyRunner{
run: make(chan struct{}, 1),
fn: fn,
timer: time.NewTimer(0),
}
}

// Run 触发执行 ,这里只能够写入一个信号量,多余的直接丢弃,不会阻塞,这里也可以根据自己的需要增加排队的个数
func (b *BoundedFrequencyRunner) Run() {
select {
case b.run <- struct{}{}:
fmt.Println("写入信号量成功")
default:
fmt.Println("已经触发过一次,直接丢弃信号量")
}
}

func (b *BoundedFrequencyRunner) Loop() {
b.timer.Reset(time.Second * 1)
for {
select {
case <-b.run:
fmt.Println("run 信号触发")
b.tryRun()
case <-b.timer.C:
fmt.Println("timer 触发执行")
b.tryRun()
}
}
}

func (b *BoundedFrequencyRunner) tryRun() {
b.Lock()
defer b.Unlock()
// 可以增加限流器等限制逻辑
b.timer.Reset(time.Second * 1)
b.fn()
}

写在最后


感谢你读到这里,如果想要看更多 Kubernetes 的文章可以订阅我的专栏: juejin.cn/column/7321… 。


作者:蔡蔡菜
来源:juejin.cn/post/7347221064429469746

0 个评论

要回复文章请先登录注册