んと…いわゆる
・バラバラとタスクを不定期に詰め込んで
・定期的なバッチで「積みあがった」タスクをまりもりとこなす
ってのは、例えばAWSなんかだとSQSとかで提供されている機能なので、まぁある程度のニーズはあろうかと思われるのです。
ただ一方で「AWS使ってないときは?」とかってなったりいろいろとあるので。
微妙に習作的なニュアンスも込みで、作ってみようかなぁ、と。
目標としては、以下の通り。
・「複数の処理バッチ」が動いても安全:障害耐性がある程度ある
→「複数台のマシン」で動かしても問題ないようにする
・可能な限り「どんな処理でも乗る」ように、マージンは広めにとっておく
・シンプルにする
・常駐は、してもいいけど基本は「常駐させずに定期的にcronとかで呼ばれる」
こんな感じかなぁ、と。
突っ込みなどありましたらお待ちしております。
SQL部分
色々と考えていたのですが。
「バッチに渡すべきデータ」は、内部的な処理のみなので「serialize()で作った文字列」でよかんべではないかなぁ、と。
個人的には「配列を格納して配列を受け取る」事を推奨したいところなんだけど、まぁ「インスタンス突っ込むんならそれはそれで自己責任のもとによい」のではないかなぁ、と。
その辺を前提にすると。
最低限のDDLとしてはおそらく
CREATE TABEL message_queue ( message_queue_id BIGINT UNSIGNED NOT NULL AUTO_INCREMENT COMMENT '一意のID:ソート条件にも使う', data blob NOT NULL COMMENT '引き渡すデータ:中はなんでもよいような感じで(サイズが足りないことが懸念されるならでっかく)', status TINYINT UNSIGNED DEFAULT 0 COMMENT '処理ステータス。 0:未処理, 1:処理中', processing_at DATETIME COMMENT '処理開始時刻。statusが0の時はNULLが想定されている', error_count TINYINT UNSIGNED DEFAULT 0 COMMENT '処理時のエラー回数(10回くらいを最大に考えてるのでTINYINT)', create_at DATETIME NOT NULL COMMENT 'このキューの作成時刻', PRIMARY KEY (`message_queue_id`) )CHARACTER SET 'utf8mb4', ENGINE=InnoDB, COMMENT='1レコードが「1つのキュー」を意味するテーブル';
こんな感じ。
INSERTはたぶん、何も考えずに「INSERT文一発」で終了。
まぁエラー補足はしたほうがよいと思うので「うまくINSERTできなかった」らエラー処理、くらいかなぁ。
で、バッチ処理用に「キューを取得する」方法。
現状は、以下を想定している感じ。
BEGIN; SELECT * FROM message_queue WHERE status=0 AND error_count < 10 ORDER BY message_queue_id LIMIT 0,1 FOR UPDATE; if (SELECTがempty) { COMMIT; // ROLLBACKとどっちがよろしかんべか? プログラム終了; } UPDATE message_queue SET status=1, processing_at=now() WHERE message_queue_id=[上で取得したID]; COMMIT; 処理本体 if (処理でエラー発生) { UPDATE message_queue SET error_count=error_count+1, status=0 WHERE message_queue_id=[上で取得したID]; } else { DELETE message_queue WHERE message_queue_id=[上で取得したID]; (DELETEできなかったらテキストログにでも書き出しておく) }
これが基本。
トランザクションの中でがっつりロックが発生するんだけど「UPDATEしたらすぐに手放す」から、多少の競合が発生しても、重たいロックにはならんのではなかろうかなぁ、と。多分。
レコードは物理削除。
論理削除はお好まない感じなので。
あとは、定期的にガベコレ的な処理として
SELECT * FROM message_queue WHERE status=1 AND processing_at < 現在時刻から1時間前
とかって感じで捕捉しておくと。
ざっくりした予想だけど「上述のSQLに当てはまるような処理はたぶん"バッチが処理中に落ちてる"と思われる」ので。
「改めて別プロセスで処理(ざっくり系)」でもいいし「管理画面等にアラートを上げる(丁寧系)」でもいいし、お好みで。
同じくガベコレ的処理として
SELECT * FROM message_queue WHERE error_count >= 10;
があって、これは「何度も(今回のケースだと10回)投げたけど毎度毎度、処理がエラーになるよ?」なので、こいつは管理画面にアラート、じゃないかなぁ。
とまぁ、こんな風に作ったらあらかたいけるような気がしてる。
なんとなく。
プログラム部分
こちらで組む側はクラス作って。
仮に、クラス名を「mw_message_queue」と仮定して。
タスクを突っ込む時は
mw_message_queue::enqueue(mixed データ, PDO $dbh=NULL);
でよいかなぁ、と。
テーブル名とかその他諸々の可変になりうる変数は「メソッドで切り出し」しておいて「変えたかったら継承して子クラス作ってそっち呼んで」くらいの感じで。
第二引数についても「基本はいる」んだけど「継承した子クラスでget_dbhを実装していれば省略できる」風に作ろうかなぁ、と。
処理的には
static pub func enqueue(mixed $data, PDO $dbh=NULL) { // if (NULL === $dbh) { $dbh = static::get_dbh(); } // チェック if (NULL === $dbh) { // 適当にエラー吐いて終了 exit; } // 続く } // 親で実装してる空っぽなメソッド static protected func get_dbh() { return NULL; }
的なのをイメージ。
タスクの処理のほうは、考えているのが
mw_message_queue::dequeue(callable $callback, PDO $dbh=NULL);
これくらい。
細かい「1バッチで処理する回数の上限」とか「(1マシン内での)多重起動の制限個数」とかはメソッドで切り出しておいて以下略。
callableが意外と丁寧な処理が必要そうなので、ここはいろいろ想定。
関数の引数と戻り値については「bool 関数(unserialize()されたデータ);」ってフォーマットを想定……DBの項目でほかに欲しい情報とかあるかしらん???
処理的には
・文字列なら、可変関数としてcall
・Closureクラスインスタンス(無名関数)なら、そのままcall
・配列の場合
→[0]がインスタンスなら、->でcall
→[0]が文字の場合
→[0]のクラスの[1]のメソッドがstaticなら、::でcall
→[0]のクラスの[1]のメソッドがstaticでないなら、->でcall
で、おかしな値は一通り「エラーをゲロって終了」と。
この辺でなんとかなるはずなんだけどなぁ……たぶんこの実装は、MagicWeaponに吸収するwww
で、ガベコレ系についてもおそらくは
// 「仕掛中のまま止まってる」子の処理 mw_message_queue::gc_processing(callable $callback, PDO $dbh=NULL); // 「エラーで止まってる」子の処理 mw_message_queue::gc_error(callable $callback, PDO $dbh=NULL);
でよいかなぁ、と。
ざっくりした設計諸々。
突っ込みがあったらよろです。
なかったら、たぶんそのうち、実装しますw