gallu’s blog

エンジニアでゲーマーで講師で占い師なおいちゃんのブログです。

簡単な「メッセージキュー」の仕組みを作ろうかなぁ……

んと…いわゆる
・バラバラとタスクを不定期に詰め込んで
・定期的なバッチで「積みあがった」タスクをまりもりとこなす
ってのは、例えばAWSなんかだとSQSとかで提供されている機能なので、まぁある程度のニーズはあろうかと思われるのです。


ただ一方で「AWS使ってないときは?」とかってなったりいろいろとあるので。
微妙に習作的なニュアンスも込みで、作ってみようかなぁ、と。


目標としては、以下の通り。
・「複数の処理バッチ」が動いても安全:障害耐性がある程度ある
 →「複数台のマシン」で動かしても問題ないようにする
・可能な限り「どんな処理でも乗る」ように、マージンは広めにとっておく
・シンプルにする
・常駐は、してもいいけど基本は「常駐させずに定期的にcronとかで呼ばれる」


こんな感じかなぁ、と。
突っ込みなどありましたらお待ちしております。

SQL部分

色々と考えていたのですが。
「バッチに渡すべきデータ」は、内部的な処理のみなので「serialize()で作った文字列」でよかんべではないかなぁ、と。
個人的には「配列を格納して配列を受け取る」事を推奨したいところなんだけど、まぁ「インスタンス突っ込むんならそれはそれで自己責任のもとによい」のではないかなぁ、と。


その辺を前提にすると。
最低限のDDLとしてはおそらく

CREATE TABEL message_queue (
    message_queue_id BIGINT UNSIGNED NOT NULL AUTO_INCREMENT COMMENT '一意のID:ソート条件にも使う',
    data blob NOT NULL COMMENT '引き渡すデータ:中はなんでもよいような感じで(サイズが足りないことが懸念されるならでっかく)',
    status TINYINT UNSIGNED DEFAULT 0 COMMENT '処理ステータス。 0:未処理, 1:処理中',
    processing_at DATETIME COMMENT '処理開始時刻。statusが0の時はNULLが想定されている',
    error_count TINYINT UNSIGNED DEFAULT 0 COMMENT '処理時のエラー回数(10回くらいを最大に考えてるのでTINYINT)',
    create_at DATETIME NOT NULL COMMENT 'このキューの作成時刻',
    PRIMARY KEY (`message_queue_id`)
)CHARACTER SET 'utf8mb4', ENGINE=InnoDB, COMMENT='1レコードが「1つのキュー」を意味するテーブル';

こんな感じ。


INSERTはたぶん、何も考えずに「INSERT文一発」で終了。
まぁエラー補足はしたほうがよいと思うので「うまくINSERTできなかった」らエラー処理、くらいかなぁ。


で、バッチ処理用に「キューを取得する」方法。
現状は、以下を想定している感じ。

BEGIN;
SELECT * FROM message_queue WHERE status=0 AND error_count < 10 ORDER BY message_queue_id LIMIT 0,1 FOR UPDATE;
if (SELECTがempty) {
    COMMIT; // ROLLBACKとどっちがよろしかんべか?
    プログラム終了;
}
UPDATE message_queue SET status=1, processing_at=now() WHERE message_queue_id=[上で取得したID];
COMMIT;


処理本体


if (処理でエラー発生) {
    UPDATE message_queue SET error_count=error_count+1, status=0 WHERE message_queue_id=[上で取得したID];
} else {
    DELETE message_queue WHERE message_queue_id=[上で取得したID];
    (DELETEできなかったらテキストログにでも書き出しておく)
}

これが基本。
トランザクションの中でがっつりロックが発生するんだけど「UPDATEしたらすぐに手放す」から、多少の競合が発生しても、重たいロックにはならんのではなかろうかなぁ、と。多分。


レコードは物理削除。
論理削除はお好まない感じなので。


あとは、定期的にガベコレ的な処理として

SELECT * FROM message_queue WHERE status=1 AND processing_at < 現在時刻から1時間前

とかって感じで捕捉しておくと。
ざっくりした予想だけど「上述のSQLに当てはまるような処理はたぶん"バッチが処理中に落ちてる"と思われる」ので。
「改めて別プロセスで処理(ざっくり系)」でもいいし「管理画面等にアラートを上げる(丁寧系)」でもいいし、お好みで。


同じくガベコレ的処理として

SELECT * FROM message_queue WHERE error_count >= 10;

があって、これは「何度も(今回のケースだと10回)投げたけど毎度毎度、処理がエラーになるよ?」なので、こいつは管理画面にアラート、じゃないかなぁ。


とまぁ、こんな風に作ったらあらかたいけるような気がしてる。
なんとなく。

プログラム部分

こちらで組む側はクラス作って。
仮に、クラス名を「mw_message_queue」と仮定して。


タスクを突っ込む時は

mw_message_queue::enqueue(mixed データ, PDO $dbh=NULL);

でよいかなぁ、と。
テーブル名とかその他諸々の可変になりうる変数は「メソッドで切り出し」しておいて「変えたかったら継承して子クラス作ってそっち呼んで」くらいの感じで。
第二引数についても「基本はいる」んだけど「継承した子クラスでget_dbhを実装していれば省略できる」風に作ろうかなぁ、と。
処理的には

static pub func enqueue(mixed $data, PDO $dbh=NULL) {
    //
    if (NULL === $dbh) {
        $dbh = static::get_dbh();
    }
    // チェック
    if (NULL === $dbh) {
        // 適当にエラー吐いて終了
        exit;
    }
    // 続く

}
// 親で実装してる空っぽなメソッド
static protected func get_dbh() {
    return NULL;
}

的なのをイメージ。


タスクの処理のほうは、考えているのが

mw_message_queue::dequeue(callable $callback, PDO $dbh=NULL);

これくらい。
細かい「1バッチで処理する回数の上限」とか「(1マシン内での)多重起動の制限個数」とかはメソッドで切り出しておいて以下略。


callableが意外と丁寧な処理が必要そうなので、ここはいろいろ想定。
関数の引数と戻り値については「bool 関数(unserialize()されたデータ);」ってフォーマットを想定……DBの項目でほかに欲しい情報とかあるかしらん???


処理的には
・文字列なら、可変関数としてcall
・Closureクラスインスタンス(無名関数)なら、そのままcall
・配列の場合
 →[0]がインスタンスなら、->でcall
 →[0]が文字の場合
  →[0]のクラスの[1]のメソッドがstaticなら、::でcall
  →[0]のクラスの[1]のメソッドがstaticでないなら、->でcall
で、おかしな値は一通り「エラーをゲロって終了」と。
この辺でなんとかなるはずなんだけどなぁ……たぶんこの実装は、MagicWeaponに吸収するwww


で、ガベコレ系についてもおそらくは

// 「仕掛中のまま止まってる」子の処理
mw_message_queue::gc_processing(callable $callback, PDO $dbh=NULL);
// 「エラーで止まってる」子の処理
mw_message_queue::gc_error(callable $callback, PDO $dbh=NULL);

でよいかなぁ、と。


ざっくりした設計諸々。
突っ込みがあったらよろです。
なかったら、たぶんそのうち、実装しますw