主頁 > 知識庫 > Go語言同步與異步執(zhí)行多個(gè)任務(wù)封裝詳解(Runner和RunnerAsync)

Go語言同步與異步執(zhí)行多個(gè)任務(wù)封裝詳解(Runner和RunnerAsync)

熱門標(biāo)簽:阿里云ai電話機(jī)器人 濱州自動(dòng)電銷機(jī)器人排名 黃岡人工智能電銷機(jī)器人哪個(gè)好 汕頭小型外呼系統(tǒng) 浙江高頻外呼系統(tǒng)多少錢一個(gè)月 建造者2地圖標(biāo)注 釘釘有地圖標(biāo)注功能嗎 惠州電銷防封電話卡 鄭州亮點(diǎn)科技用的什么外呼系統(tǒng)

前言

同步適合多個(gè)連續(xù)執(zhí)行的,每一步的執(zhí)行依賴于上一步操作,異步執(zhí)行則和任務(wù)執(zhí)行順序無關(guān)(如從10個(gè)站點(diǎn)抓取數(shù)據(jù))

同步執(zhí)行類RunnerAsync

支持返回超時(shí)檢測,系統(tǒng)中斷檢測

錯(cuò)誤常量定義

//超時(shí)錯(cuò)誤
var ErrTimeout = errors.New("received timeout")
//操作系統(tǒng)系統(tǒng)中斷錯(cuò)誤
var ErrInterrupt = errors.New("received interrupt")

實(shí)現(xiàn)代碼如下

package task
import (
 "os"
 "time"
 "os/signal"
 "sync"
)
 
//異步執(zhí)行任務(wù)
type Runner struct {
 //操作系統(tǒng)的信號檢測
 interrupt chan os.Signal
 //記錄執(zhí)行完成的狀態(tài)
 complete chan error
 //超時(shí)檢測
 timeout -chan time.Time
 //保存所有要執(zhí)行的任務(wù),順序執(zhí)行
 tasks []func(id int) error
 waitGroup sync.WaitGroup
 lock sync.Mutex
 errs []error
}
 
//new一個(gè)Runner對象
func NewRunner(d time.Duration) *Runner {
 return Runner{
 interrupt: make(chan os.Signal, 1),
 complete: make(chan error),
 timeout: time.After(d),
 waitGroup: sync.WaitGroup{},
 lock: sync.Mutex{},
 }
}
 
//添加一個(gè)任務(wù)
func (this *Runner) Add(tasks ...func(id int) error) {
 this.tasks = append(this.tasks, tasks...)
}
 
//啟動(dòng)Runner,監(jiān)聽錯(cuò)誤信息
func (this *Runner) Start() error {
 //接收操作系統(tǒng)信號
 signal.Notify(this.interrupt, os.Interrupt)
 //并發(fā)執(zhí)行任務(wù)
 go func() {
 this.complete - this.Run()
 }()
 select {
 //返回執(zhí)行結(jié)果
 case err := -this.complete:
 return err
 //超時(shí)返回
 case -this.timeout:
 return ErrTimeout
 }
}
 
//異步執(zhí)行所有的任務(wù)
func (this *Runner) Run() error {
 for id, task := range this.tasks {
 if this.gotInterrupt() {
  return ErrInterrupt
 }
 this.waitGroup.Add(1)
 go func(id int) {
  this.lock.Lock()
  //執(zhí)行任務(wù)
  err := task(id)
  //加鎖保存到結(jié)果集中
  this.errs = append(this.errs, err)
 
  this.lock.Unlock()
  this.waitGroup.Done()
 }(id)
 }
 this.waitGroup.Wait()
 
 return nil
}
 
//判斷是否接收到操作系統(tǒng)中斷信號
func (this *Runner) gotInterrupt() bool {
 select {
 case -this.interrupt:
 //停止接收別的信號
 signal.Stop(this.interrupt)
 return true
 //正常執(zhí)行
 default:
 return false
 }
}
 
//獲取執(zhí)行完的error
func (this *Runner) GetErrs() []error {
 return this.errs
}

使用方法    

Add添加一個(gè)任務(wù),任務(wù)為接收int類型的一個(gè)閉包

Start開始執(zhí)行傷,返回一個(gè)error類型,nil為執(zhí)行完畢, ErrTimeout代表執(zhí)行超時(shí),ErrInterrupt代表執(zhí)行被中斷(類似Ctrl + C操作)

測試示例代碼

package task
import (
 "testing"
 "time"
 "fmt"
 "os"
 "runtime"
)
 
func TestRunnerAsync_Start(t *testing.T) {
 //開啟多核
 runtime.GOMAXPROCS(runtime.NumCPU())
 //創(chuàng)建runner對象,設(shè)置超時(shí)時(shí)間
 runner := NewRunnerAsync(8 * time.Second)
 //添加運(yùn)行的任務(wù)
 runner.Add(
 createTaskAsync(),
 createTaskAsync(),
 createTaskAsync(),
 createTaskAsync(),
 createTaskAsync(),
 createTaskAsync(),
 createTaskAsync(),
 createTaskAsync(),
 createTaskAsync(),
 createTaskAsync(),
 createTaskAsync(),
 createTaskAsync(),
 createTaskAsync(),
 )
 fmt.Println("同步執(zhí)行任務(wù)")
 //開始執(zhí)行任務(wù)
 if err := runner.Start(); err != nil {
 switch err {
 case ErrTimeout:
  fmt.Println("執(zhí)行超時(shí)")
  os.Exit(1)
 case ErrInterrupt:
  fmt.Println("任務(wù)被中斷")
  os.Exit(2)
 }
 }
 t.Log("執(zhí)行結(jié)束")
}
 
//創(chuàng)建要執(zhí)行的任務(wù)
func createTaskAsync() func(id int) {
 return func(id int) {
 fmt.Printf("正在執(zhí)行%v個(gè)任務(wù)\n", id)
 //模擬任務(wù)執(zhí)行,sleep兩秒
 //time.Sleep(1 * time.Second)
 }
}

執(zhí)行結(jié)果  

同步執(zhí)行任務(wù)
正在執(zhí)行0個(gè)任務(wù)
正在執(zhí)行1個(gè)任務(wù)
正在執(zhí)行2個(gè)任務(wù)
正在執(zhí)行3個(gè)任務(wù)
正在執(zhí)行4個(gè)任務(wù)
正在執(zhí)行5個(gè)任務(wù)
正在執(zhí)行6個(gè)任務(wù)
正在執(zhí)行7個(gè)任務(wù)
正在執(zhí)行8個(gè)任務(wù)
正在執(zhí)行9個(gè)任務(wù)
正在執(zhí)行10個(gè)任務(wù)
正在執(zhí)行11個(gè)任務(wù)
正在執(zhí)行12個(gè)任務(wù)
 runnerAsync_test.go:49: 執(zhí)行結(jié)束

異步執(zhí)行類Runner

支持返回超時(shí)檢測,系統(tǒng)中斷檢測

實(shí)現(xiàn)代碼如下

package task
import (
 "os"
 "time"
 "os/signal"
 "sync"
)
 
//異步執(zhí)行任務(wù)
type Runner struct {
 //操作系統(tǒng)的信號檢測
 interrupt chan os.Signal
 //記錄執(zhí)行完成的狀態(tài)
 complete chan error
 //超時(shí)檢測
 timeout -chan time.Time
 //保存所有要執(zhí)行的任務(wù),順序執(zhí)行
 tasks []func(id int) error
 waitGroup sync.WaitGroup
 lock sync.Mutex
 errs []error
}
 
//new一個(gè)Runner對象
func NewRunner(d time.Duration) *Runner {
 return Runner{
  interrupt: make(chan os.Signal, 1),
  complete: make(chan error),
  timeout: time.After(d),
  waitGroup: sync.WaitGroup{},
  lock:  sync.Mutex{},
 }
}
 
//添加一個(gè)任務(wù)
func (this *Runner) Add(tasks ...func(id int) error) {
 this.tasks = append(this.tasks, tasks...)
}
 
//啟動(dòng)Runner,監(jiān)聽錯(cuò)誤信息
func (this *Runner) Start() error {
 //接收操作系統(tǒng)信號
 signal.Notify(this.interrupt, os.Interrupt)
 //并發(fā)執(zhí)行任務(wù)
 go func() {
  this.complete - this.Run()
 }()
 select {
 //返回執(zhí)行結(jié)果
 case err := -this.complete:
  return err
  //超時(shí)返回
 case -this.timeout:
  return ErrTimeout
 }
}
 
//異步執(zhí)行所有的任務(wù)
func (this *Runner) Run() error {
 for id, task := range this.tasks {
  if this.gotInterrupt() {
   return ErrInterrupt
  }
  this.waitGroup.Add(1)
  go func(id int) {
   this.lock.Lock()
   //執(zhí)行任務(wù)
   err := task(id)
   //加鎖保存到結(jié)果集中
   this.errs = append(this.errs, err)
   this.lock.Unlock()
   this.waitGroup.Done()
  }(id)
 }
 this.waitGroup.Wait()
 return nil
}
 
//判斷是否接收到操作系統(tǒng)中斷信號
func (this *Runner) gotInterrupt() bool {
 select {
 case -this.interrupt:
  //停止接收別的信號
  signal.Stop(this.interrupt)
  return true
  //正常執(zhí)行
 default:
  return false
 }
}
 
//獲取執(zhí)行完的error
func (this *Runner) GetErrs() []error {
 return this.errs
}

使用方法    

Add添加一個(gè)任務(wù),任務(wù)為接收int類型,返回類型error的一個(gè)閉包

Start開始執(zhí)行傷,返回一個(gè)error類型,nil為執(zhí)行完畢, ErrTimeout代表執(zhí)行超時(shí),ErrInterrupt代表執(zhí)行被中斷(類似Ctrl + C操作)

getErrs獲取所有的任務(wù)執(zhí)行結(jié)果

測試示例代碼

package task
import (
 "testing"
 "time"
 "fmt"
 "os"
 "runtime"
)
 
func TestRunner_Start(t *testing.T) {
 //開啟多核心
 runtime.GOMAXPROCS(runtime.NumCPU())
 //創(chuàng)建runner對象,設(shè)置超時(shí)時(shí)間
 runner := NewRunner(18 * time.Second)
 //添加運(yùn)行的任務(wù)
 runner.Add(
  createTask(),
  createTask(),
  createTask(),
  createTask(),
  createTask(),
  createTask(),
  createTask(),
  createTask(),
  createTask(),
  createTask(),
  createTask(),
  createTask(),
  createTask(),
  createTask(),
 )
 fmt.Println("異步執(zhí)行任務(wù)")
 //開始執(zhí)行任務(wù)
 if err := runner.Start(); err != nil {
  switch err {
  case ErrTimeout:
   fmt.Println("執(zhí)行超時(shí)")
   os.Exit(1)
  case ErrInterrupt:
   fmt.Println("任務(wù)被中斷")
   os.Exit(2)
  }
 }
 t.Log("執(zhí)行結(jié)束")
 t.Log(runner.GetErrs())
}
 
//創(chuàng)建要執(zhí)行的任務(wù)
func createTask() func(id int) error {
 return func(id int) error {
  fmt.Printf("正在執(zhí)行%v個(gè)任務(wù)\n", id)
  //模擬任務(wù)執(zhí)行,sleep
  //time.Sleep(1 * time.Second)
  return nil
 }
}

執(zhí)行結(jié)果

異步執(zhí)行任務(wù)
正在執(zhí)行2個(gè)任務(wù)
正在執(zhí)行1個(gè)任務(wù)
正在執(zhí)行4個(gè)任務(wù)
正在執(zhí)行3個(gè)任務(wù)
正在執(zhí)行6個(gè)任務(wù)
正在執(zhí)行5個(gè)任務(wù)
正在執(zhí)行9個(gè)任務(wù)
正在執(zhí)行7個(gè)任務(wù)
正在執(zhí)行10個(gè)任務(wù)
正在執(zhí)行13個(gè)任務(wù)
正在執(zhí)行8個(gè)任務(wù)
正在執(zhí)行11個(gè)任務(wù)
正在執(zhí)行12個(gè)任務(wù)
正在執(zhí)行0個(gè)任務(wù)
 runner_test.go:49: 執(zhí)行結(jié)束
 runner_test.go:51: [nil> nil> nil> nil> nil> nil> nil> nil> nil> nil> nil> nil> nil> nil>]

總結(jié)

以上就是這篇文章的全部內(nèi)容了,希望本文的內(nèi)容對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,如果有疑問大家可以留言交流,謝謝大家對腳本之家的支持。

您可能感興趣的文章:
  • go語言同步教程之條件變量

標(biāo)簽:泰安 阿壩 昭通 滄州 瀘州 駐馬店 晉中 東營

巨人網(wǎng)絡(luò)通訊聲明:本文標(biāo)題《Go語言同步與異步執(zhí)行多個(gè)任務(wù)封裝詳解(Runner和RunnerAsync)》,本文關(guān)鍵詞  語言,同步,與,異步,執(zhí)行,;如發(fā)現(xiàn)本文內(nèi)容存在版權(quán)問題,煩請?zhí)峁┫嚓P(guān)信息告之我們,我們將及時(shí)溝通與處理。本站內(nèi)容系統(tǒng)采集于網(wǎng)絡(luò),涉及言論、版權(quán)與本站無關(guān)。
  • 相關(guān)文章
  • 下面列出與本文章《Go語言同步與異步執(zhí)行多個(gè)任務(wù)封裝詳解(Runner和RunnerAsync)》相關(guān)的同類信息!
  • 本頁收集關(guān)于Go語言同步與異步執(zhí)行多個(gè)任務(wù)封裝詳解(Runner和RunnerAsync)的相關(guān)信息資訊供網(wǎng)民參考!
  • 推薦文章