队列通信 chan
构造第一页,及请求参数
请求接口推入总页数
失败重试,重新推入队列
记录错误信息
https://github.com/ph/moby/blob/1.12.x/pkg/pubsub/publisher.go#L22
package pkg
import (
	"sync"
)
// 三方接口拉取抽象处理
type Requester struct{
	requestFunc  RequestFunc
	processFunc  ProcessFunc
	wg           *sync.WaitGroup              // 控制chan退出
	queue        chan RequestFormat
	Retries      int						  // fail retries
	Tasks		 int 						  // set concurrency tasks
}
type RequestFunc func(RequestFormat) (ResponseFormat, error) // 定义接口请求func规范,行参标准请求格式,返回标准响应格式
type ProcessFunc func(ResponseFormat) error // 业务数据处理方法,行参为请准响应格式,return error
// standard request format
type RequestFormat struct{
	Page int
	Limit int
	fails int
	ReqFormat interface{}
}
// standard response format
type ResponseFormat struct{
	page int
	total int
	resFormat interface{}
}
func NewRequester(retries, tasks int) *Requester {
	return &Requester{
		Retries: retries,
		Tasks:  tasks,
		wg: &sync.WaitGroup{},
		queue: make(chan RequestFormat, 1000),
	}
}
func (r *Requester) SetRequestDeal(reqFunc RequestFunc) *Requester {
	r.requestFunc = reqFunc
	return r
}
func (r *Requester) SetDataProcess(repFunc ProcessFunc) *Requester {
	r.processFunc = repFunc
	return r
}
func (r *Requester) InitParams(params interface{}) *Requester {
	reqInfo := RequestFormat{
		Page: 1,
		Limit: 1,
		ReqFormat: params,
	}
	r.produce(reqInfo)
	return r
}
// error deal
// 1. return error slice 同步阻塞 (简单)
// 2. 定义形参error chan ,交给调用程序处理,如果err忘记处理,程序会堵塞
func(r *Requester) Run() (errs []error)  {
	wg := &sync.WaitGroup{}
	for i := 0; i < r.Tasks; i++ {
		wg.Add(1)
		go func() {
			defer wg.Done()
			errs = append(errs, r.pull()...)
		}()
	}
	r.close()
	wg.Wait()
	return errs
}
/**
request pull deal,return error slice
队列再抽象
 */
func(r *Requester) pull() (errs []error) {
	for reqInfo := range r.queue {
		func(reqInfo RequestFormat){
			defer r.wg.Done()
			res, err := r.requestFunc(reqInfo)
			if err != nil {
				if reqInfo.fails > r.Retries {
					errs = append(errs,err)
					return
				}
				reqInfo.fails++
				r.produce(reqInfo)
				return
			}
			// 首页拉取补充剩余页数
			if res.page == 1 {
				for i := 2; i <= res.total; i++ {
					reqInfo.Page = i
					r.produce(reqInfo)
				}
			}
			// data process
			err = r.processFunc(res)
			if err != nil {
				errs = append(errs,err)
			}
		}(reqInfo)
	}
	return errs
}
// sends the request data to channel 
func(r *Requester) produce(d RequestFormat) {
	defer r.wg.Add(1)
	r.queue <- d
}
// close the channel to all consumers
func(r *Requester) close() {
	r.wg.Wait()
	close(r.queue)
}