がるの健忘録

エンジニアでゲーマーで講師で占い師なおいちゃんのブログです。

LaravelのMessage Queueを把握してみる:本体

さて、長い前説を前提にして、本題。
ようは「Message Queue」って文脈で必要なのは

・キューを積む場所
 → 積み方(enqueue
  → 電文フォーマット
 → 読み方(dequeue
・お仕事を消化するバッチ
 → 起動の仕方と終了の仕方
  → 寿命は持たせられるのか?
 → 「お仕事がない時」の挙動

の各要素となります。
ので、マニュアルからこの要素を抜き出して、ついでにちょろっと実験コードなど書いて検証をしてみませう。

ついでに用語をあわせていきませう。いったん https://readouble.com/laravel/8.x/ja/queues.html をベースにします。

先に結論から書くと

・電文フォーマットは「それ用のクラスインスタンスつくって、基本そのserialize+αをそのままキューに積む」
 → ので、処理は「それ用のクラス」に書いておく
・積んだキューを「読んで実行」は「全部Laravel側で隠蔽している」から、コードレベルでは基本「触らないし触れない」
 → ので、「実行時にあれこれしたい事」は全部「電文として乗せるクラスインスタンス上で表現したり書いたりしなきゃいけない」
 → だから、いわゆる「プロセス管理ツール(Laravelはsupervisorを推してる)」で外側から管理する必要がある

こんな感じ。
なので、旧来の「積むのはシンプル、がっつりした実装はお仕事消化バッチ側」って意識で触ろうとすると、混乱しかないので注意。

本体の、本題。

まずはキューの
・場所の用意
・積み方
について。

用語としての「キュー」は、そのままでOKっぽいです( https://readouble.com/laravel/8.x/ja/queues.html#connections-vs-queues )。
ただ一方で「イベント」って呼称も出てくるので、ちょいと意識しておくとよいです。

キューを積む場所は、ある程度任意に設定できるぽ( https://readouble.com/laravel/8.x/ja/queues.html#driver-prerequisites )。
一端、手短にできる「データベース」にしておきませう。

php artisan queue:table
php artisan migrate

QUEUE_CONNECTIONは、デフォが sync になってるぽいので、マニュアル通り database に変更しておきませう。

QUEUE_CONNECTION=database

テーブルレイアウトを軽く確認。

MariaDB [lara_test]> show create table jobs \G
*************************** 1. row ***************************
       Table: jobs
Create Table: CREATE TABLE `jobs` (
  `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT,
  `queue` varchar(255) COLLATE utf8mb4_unicode_ci NOT NULL,
  `payload` longtext COLLATE utf8mb4_unicode_ci NOT NULL,
  `attempts` tinyint(3) unsigned NOT NULL,
  `reserved_at` int(10) unsigned DEFAULT NULL,
  `available_at` int(10) unsigned NOT NULL,
  `created_at` int(10) unsigned NOT NULL,
  PRIMARY KEY (`id`),
  KEY `jobs_queue_index` (`queue`(191))
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci
1 row in set (0.00 sec)

多分「payload」に電文が載るんだろうなぁ、とか軽く予想。

んで積み方&電文フォーマットについて。
どうも「専用のクラスを用意する必要がある」ぽい。
マニュアルにそって作ってみる。

php artisan make:job TestJob

ファイルは、app/Jobs/に出来るぽい。

./app/Jobs/TestJob.php

<?php

namespace App\Jobs;

use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldBeUnique;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;

class TestJob implements ShouldQueue
{
    use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;

    /**
     * Create a new job instance.
     *
     * @return void
     */
    public function __construct()
    {
        //
    }

    /**
     * Execute the job.
     *
     * @return void
     */
    public function handle()
    {
        //
    }
}

んで。
このクラス(のインスタンス)が「メッセージ」になるし「実行時に動くコード」にもなるので(厳密には、(下でも書きますが)このクラスの handle()ってメソッドがdequeueされた時に動く)、このクラスに実装をガリゴリと書いていく事になります。
んで、このクラスのインスタンス自体を「キューに積む」形になります。
なので、イメージとしては「クラスがそのまま電文フォーマットになっている」感じ。

Laravelではenqueueの事を「ディスパッチ」と呼んでいるっぽいです。
なので、データはコンストラクタで設定します&そのコンストラクタへの値の設置はdispatch()(enqueueする)の引数で渡します。

例えば

    public function __construct(
        private string $s_hoge,
        private int $i_foo,
    )
    {
    }

ってコンストラクタを定義しておいて

TestJob::dispatch('string', 999);

って使うと
・コンストラクタの引数として 'string', 999 が渡されて
・キューにenqueueされる
ことになります。

なお、dispatch() メソッドの戻り値には Illuminate\Foundation\Bus\PendingDispatch インスタンスがかえってきていました(中は細かく見てません)。
DBの中身

MariaDB [lara_test]> select * from jobs \G
*************************** 1. row ***************************
          id: 1
       queue: default
     payload: {"uuid":"2ccc212d-ebd8-4033-9550-e3364902745a","displayName":"App\\Jobs\\TestJob","job":"Illuminate\\Queue\\CallQueuedHandler@call","maxTries":null,"maxExceptions":null,"failOnTimeout":false,"backoff":null,"timeout":null,"retryUntil":null,"data":{"commandName":"App\\Jobs\\TestJob","command":"O:16:\"App\\Jobs\\TestJob\":12:{s:24:\"\u0000App\\Jobs\\TestJob\u0000s_hoge\";s:6:\"string\";s:23:\"\u0000App\\Jobs\\TestJob\u0000i_foo\";i:999;s:3:\"job\";N;s:10:\"connection\";N;s:5:\"queue\";N;s:15:\"chainConnection\";N;s:10:\"chainQueue\";N;s:19:\"chainCatchCallbacks\";N;s:5:\"delay\";N;s:11:\"afterCommit\";N;s:10:\"middleware\";a:0:{}s:7:\"chained\";a:0:{}}"}}
    attempts: 0
 reserved_at: NULL
available_at: 1653999999
  created_at: 1653999999

ん……「created_atがintだよをい!!」ってのはおいといて*1
予想通り、payloadに電文相当が入ってる感じですねぇ。
多分、ポイントは

	"data": {
		"commandName": "App\\Jobs\\TestJob",
		"command": "O:16:\"App\\Jobs\\TestJob\":12:{s:24:\"\u0000App\\Jobs\\TestJob\u0000s_hoge\";s:6:\"string\";s:23:\"\u0000App\\Jobs\\TestJob\u0000i_foo\";i:999;s:3:\"job\";N;s:10:\"connection\";N;s:5:\"queue\";N;s:15:\"chainConnection\";N;s:10:\"chainQueue\";N;s:19:\"chainCatchCallbacks\";N;s:5:\"delay\";N;s:11:\"afterCommit\";N;s:10:\"middleware\";a:0:{}s:7:\"chained\";a:0:{}}"
	}

この辺り。「jsonの中にserializeかね」とは思うんだけど、まぁ「実行プログラムをそのままクラスインスタンスにしてそれを電文として乗っける」んなら、こーなるわなぁ*2

まぁなのでとりあえず「積めた」。
旧来だとここから「実際に、お仕事を実行して消化する」バッチを作るのが、仕組み含めてメインイベントだし大仕事なのですが。
Laravelはそれを完全に隠蔽しているので、基本、このコマンドを叩くだけ。

php artisan queue:work

そうすると「先ほど作成したクラスインスタンスの、 handle() メソッドが実行される(ことをもってバッチの実行とする)」って動きになります以上終了。
んで、お仕事がない時も「ず~っと、"php artisan queue:work"は動きっぱなし」なので。基本的には「一度起動したらあとは放置」って感じになります。

なのでまぁ「読み方」「起動と終了の仕方」「お仕事がない時」については、「php artisan queue:workを一度起動したらあとは放置」となります。

ここまでが基本形。
なので、一端「ここまでの理解」を、実装なり実験なりするとよいんじゃないかなぁ、と思われます。

んで、実際の実装の時はもうちょっと色々あるので……面倒なんで詳細は省きますが、「キューを複数種類、名前付けて」とか「ジョブを一意に保つ(セマフォで多重度1、ってことかしらん? マニュアルにあるけどよぉわからん)」とか「エラー時の最大試行回数」とか「エラー時の時間ベースの試行」とか「タイムアウトの設定」とか、その辺はまぁなんか出来るっぽいです。
あとは、試してないんですがおそらく「"php artisan queue:work" を2プロセス立ち上げる」事で、実質「多重度2の処理」が出来るんだろうなぁ、と。

ただ、「workのプロセスを常にn個に保つ」ようにするのには、マニュアルの通りだとすると Supervisor が別途必要なようです(daemontools はちょろっと使ってたんだけどねぇ……「1ファイルをnプロセス起動」とかの設定はSupervisorのほうが楽なのか)。
これ、ちゃんと「Supervisorの設定ファイルもコード管理とかに含めてデプロイとかのルートに乗せて」しないと、なんか齟齬りそうな気がせんでもないんだよなぁ……とか思ってみたり、は、ちょろっと。「一度プロセス落として再起動」とかは楽だから、一長一短な気もするんだけど。
あと、キューがn種類あると、n個の設定ファイルちゃんと書かないと駄目だよねぇ、とか(まぁこの辺は些細なところかな)。


というわけで、調べてみたところ、「昔からこの辺を作成している」人にとっては、結構な齟齬がありました。
端的には「実作業側のバッチが、ほぼ全くアタッチできない」ところと、それに伴って「enqueueはあるのにdequeueがない(隠蔽されている)」あたりが、大っきな違和感かなぁ。
片側として「わからんでもない」んだけど、せめて「触れる余地と隙間」くらいは作っておいて欲しかったなぁ……とか思うんだけど、まぁその辺がないのが「万人向け」なのかもなぁ、とか、思ってみたりしなくもないです。

ただまぁ、その辺踏まえておかないと色々と大変そうだなぁ、と思ったので、主に備忘録用に記載しておきます。


おまけ

旧来の「Message Queueバッチ」の作り方雑版
・事前に電文フォーマットは決めておく(パースが楽なのがいいよ)
・enqueue側は「所定のDBその他に文字列を送る」で終了
・dequeue側は「所定のDBから1レコード取得して電文をパースして処理」
 → 多重制御とかプロセス起動とかは「cronで毎分起動」「セマフォで多重制御」すると楽だよ

Laravelの「Message Queueバッチ」の作り方雑版
・事前に「キュー情報」が入れられる場所を作ろう。DBなら php artisan queue:table で作れるよ
・enqueue側は php artisan make:job クラス名 でクラス作って、処理はそのクラスの handle() メソッドに書いておこう。データはコンストラクタで渡してね
 実際にenqueueする時は クラス名::::dispatch(コンストラクタへの引数) ってやるとenqueueできるよ
・dequeue側は php artisan queue:work を叩くだけ。実装は不要だよ
 → 「処理実行バッチ側でなにかをやりたい」時は、Laravelが提供しているかどうかを調べてみよう(してなかったら、無理または茨の道だよ)
 → プロセス起動などは Supervisor っていうプロセス管理ツールを別途使うことを勧められてるよ

*1:更なる余談としては「BIGINTじゃなくてINTなのか……2038年問題に引っかかるな」ってあたりが、色々と色々。Laravel、2038年問題はわりと「一切合切、無視している」よねぇ。なんでだろ?

*2:いや「全体的にserializeでよくね?」とは思うんだが、まぁ