Go Pkg Request Pull Encapsulate

2022/01/20

队列通信 chan

构造第一页,及请求参数

请求接口推入总页数

失败重试,重新推入队列

记录错误信息

https://github.com/ph/moby/blob/1.12.x/pkg/pubsub/publisher.go#L22

package pkg

import (
	"sync"
)

// 三方接口拉取抽象处理
type Requester struct{
	requestFunc  RequestFunc
	processFunc  ProcessFunc
	wg           *sync.WaitGroup              // 控制chan退出
	queue        chan RequestFormat
	Retries      int						  // fail retries
	Tasks		 int 						  // set concurrency tasks
}

type RequestFunc func(RequestFormat) (ResponseFormat, error) // 定义接口请求func规范,行参标准请求格式,返回标准响应格式
type ProcessFunc func(ResponseFormat) error // 业务数据处理方法,行参为请准响应格式,return error

// standard request format
type RequestFormat struct{
	Page int
	Limit int
	fails int
	ReqFormat interface{}
}

// standard response format
type ResponseFormat struct{
	page int
	total int
	resFormat interface{}
}

func NewRequester(retries, tasks int) *Requester {
	return &Requester{
		Retries: retries,
		Tasks:  tasks,
		wg: &sync.WaitGroup{},
		queue: make(chan RequestFormat, 1000),
	}
}

func (r *Requester) SetRequestDeal(reqFunc RequestFunc) *Requester {
	r.requestFunc = reqFunc
	return r
}

func (r *Requester) SetDataProcess(repFunc ProcessFunc) *Requester {
	r.processFunc = repFunc
	return r
}

func (r *Requester) InitParams(params interface{}) *Requester {
	reqInfo := RequestFormat{
		Page: 1,
		Limit: 1,
		ReqFormat: params,
	}
	r.produce(reqInfo)
	return r
}

// error deal
// 1. return error slice 同步阻塞 (简单)
// 2. 定义形参error chan ,交给调用程序处理,如果err忘记处理,程序会堵塞
func(r *Requester) Run() (errs []error)  {
	wg := &sync.WaitGroup{}
	for i := 0; i < r.Tasks; i++ {
		wg.Add(1)
		go func() {
			defer wg.Done()
			errs = append(errs, r.pull()...)
		}()
	}
	r.close()
	wg.Wait()
	return errs
}


/**
request pull deal,return error slice
队列再抽象
 */
func(r *Requester) pull() (errs []error) {
	for reqInfo := range r.queue {
		func(reqInfo RequestFormat){
			defer r.wg.Done()
			res, err := r.requestFunc(reqInfo)
			if err != nil {
				if reqInfo.fails > r.Retries {
					errs = append(errs,err)
					return
				}
				reqInfo.fails++
				r.produce(reqInfo)
				return
			}

			// 首页拉取补充剩余页数
			if res.page == 1 {
				for i := 2; i <= res.total; i++ {
					reqInfo.Page = i
					r.produce(reqInfo)
				}
			}

			// data process
			err = r.processFunc(res)
			if err != nil {
				errs = append(errs,err)
			}
		}(reqInfo)
	}
	return errs
}

// sends the request data to channel 
func(r *Requester) produce(d RequestFormat) {
	defer r.wg.Add(1)
	r.queue <- d
}

// close the channel to all consumers
func(r *Requester) close() {
	r.wg.Wait()
	close(r.queue)
}

Search

    Table of Contents