[golang]簡素なバッチスケジューラを作りながらgoroutineに親しむ

今回は、Goの非同期処理に親しむためにタスクをバッチ実行する簡単なスケジューラを作ってみました。
その時の内容をこの記事にまとめて、社内勉強会で紹介します。

この記事内で使用している機能

  • goroutine
  • channel
  • sync

今回作成したバッチサーバの要件はこんな感じです。

  1. それぞれのタスクをgoroutineで並行処理する
  2. タスクを一定間隔で定期実行する
  3. 前のタスクが終わっていない場合は定期実行をスキップする

それではさっそく作っていきます。

まずは下記のようにJob Schedulerを定義します。

type Scheduler struct {
jobs []Job
wg *sync.WaitGroup
}

type Job struct {
exit atomic.Uint32
name string
function func()
duration time.Duration
quit chan bool
mu sync.Mutex
}

Job構造体にはそれぞれ実行したいタスクをfunctionとして関数を指定できます。

次に、一定間隔でタスクを実行するためのメソッドを作成します。

func (j *Job) do(wg *sync.WaitGroup) {
defer wg.Done()

ticker := time.NewTicker(j.duration)
for {
select {
case <-ticker.C:
if j.alive() {
j.mu.Lock()
j.function() // タスク実行
j.mu.Unlock()
}
case <-j.quit:
ticker.Stop()
return
}
}
}

selectは複数のチャネルを待ち受ける時に便利な機能です。

Tickerで定期的に信号を受け取り、タスクを実行しています。
Jobはそれぞれmutexを持っており、タスクを排他実行します。
mutexは排他ロックを簡単に実装できるものです。(https://pkg.go.dev/sync#Mutex
これによって前のタスクがまだ実行中の場合は次のタスクをスキップします。

また、quit信号を受け取ったときはtickerを止めてWaitGroupを実行済みにします。

続いてSchedulerの実行と停止のメソッドを作成します。

func (s *Scheduler) Run() {
for i := 0; i < len(s.jobs); i++ {
s.wg.Add(1)
go s.jobs[i].do(s.wg) // goroutineで実行
}
}

func (s *Scheduler) Stop(done chan bool) {
for i := 0; i < len(s.jobs); i++ {
s.jobs[i].terminate()
s.jobs[i].quit <- true
}

s.wg.Wait()
done <- true
}

SchedulerはWaitGroupを持っており、Stopメソッドが呼ばれた時は
実行中のJobの終了を待ち合わせてから停止完了の信号をチャネルdoneへ送信します。

あとは便利のため、周辺の機能をメソッドに切り出して作成しておきます。

func NewScheduler(j []Job) *Scheduler {
var wg sync.WaitGroup
return &Scheduler{j, &wg}
}

func NewJob(name string, f func(), d time.Duration) Job {
return Job{
name: name,
duration: d,
function: f,
quit: make(chan bool),
}
}

// Jobの停止を登録
func (j *Job) terminate() {
j.exit.Store(1)
}

// Jobに停止信号が送られていないか確認
func (j *Job) alive() bool {
return j.exit.Load() == 0
}

Jobの持っているexitにはsyncパッケージのatomic.Uint32を使っています。
https://pkg.go.dev/sync/atomic
メモリへのアトミックな読み書きができるので、非同期な処理の際はこちらを使った方がいいかなと思います。

ただし、atomicのドキュメントにも記載がある通り、使用には細心の注意が必要です。

These functions require great care to be used correctly. Except for special, low-level applications, synchronization is better done with channels or the facilities of the sync package. Share memory by communicating; don't communicate by sharing memory.

syncパッケージで事足りるのであればそちらを使うべきです。
今回はsyncパッケージのonce内でのatomicの使われ方を参考にしてみました。
https://cs.opensource.google/go/go/+/refs/tags/go1.21.0:src/sync/once.go

スケジューラの実装は以上です。
実行例は以下のようになります。

func main() {
jobs := []Job{
NewJob(
"JobA",
func() { fmt.Println("JobA") },
time.Second,
),
NewJob(
"JobB",
func() {
time.Sleep(3 * time.Second)
fmt.Println("JobB")
},
time.Second,
),
}

s := NewScheduler(jobs)
s.Run()

quit := make(chan os.Signal, 1)
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
<-quit

done := make(chan bool, 1)
s.Stop(done)
<-done
fmt.Println("server exiting.")
}

上記の例では、それぞれのタスクは毎秒実行されますが、JobBは実行に3秒かかる想定です。
Jobの実行中は重複実行されませんので、このJobは3秒ごとに”JobB”を出力することになります。

バッチサーバはCtrl+Cなどのシグナルを受け取るまで実行状態になります。
停止シグナルを受け取るとSchedulerのStopメソッドを実行し、
実行中のタスクが全て完了するのを待ち合わせてから終了します。

まとめ

goroutineを使うことで、非常に簡単に並行処理の動作を実現できます。
ただし、スレッド間の情報のやり取り(channel)は実行されるタイミングをよく検討しないと
うまく動作しないので、こちらには慣れる必要があります。

参考

goroutineやchannelなど並行処理について
https://go.dev/tour/concurrency/1

syncパッケージ
https://pkg.go.dev/sync

前へ

Lexical PlayGroundから仕組みを調べてみた

次へ

【ProseMirror】React(Next.js)で簡単にWYSIWYGマークダウンエディタを作成してみた。