mirror of
https://github.com/misskey-dev/misskey.git
synced 2026-06-25 17:10:43 +00:00
Merge f647329ed0 into 079ec865e0
This commit is contained in:
commit
fa42611c8d
23 changed files with 329 additions and 16 deletions
|
|
@ -34,6 +34,7 @@
|
|||
- Fix: 画像アップロード時、フレームのキャプション付与が正しく行われないことがある問題を修正
|
||||
|
||||
### Server
|
||||
- Fix: 管理画面で `enableFanoutTimeline` を切り替えるとタイムラインにギャップが生じノートが取りこぼされる問題を修正 (過渡期フラグ `fanoutTimelineActive` を導入し、トグル中は BullMQ ジョブで Redis 上の `list:*` をパージしてからデータプレーンを切り替える方式に変更。過渡期中はデータプレーンが FTTL を一切使用せず DB 直行となり、過渡期中の `enableFanoutTimeline` 再変更は 409 で拒否される)
|
||||
- Enhance: リモートノートクリーニングジョブのスキップ処理のパフォーマンス改善
|
||||
- Enhance: リモートノートクリーニングジョブの削除対象検索処理のパフォーマンス改善
|
||||
- Enhance: ActivityPub の画像添付に width/height を含めるように
|
||||
|
|
|
|||
|
|
@ -1760,6 +1760,7 @@ _serverSettings:
|
|||
shortName: "略称"
|
||||
shortNameDescription: "サーバーの正式名称が長い場合に、代わりに表示することのできる略称や通称。"
|
||||
fanoutTimelineDescription: "有効にすると、各種タイムラインを取得する際のパフォーマンスが大幅に向上し、データベースへの負荷を軽減することが可能です。ただし、Redisのメモリ使用量は増加します。サーバーのメモリ容量が少ない場合、または動作が不安定な場合は無効にすることができます。"
|
||||
fanoutTimelineTransitionInProgress: "切り替えを適用中です。完了するまでこのトグルは操作できません。適用中、タイムラインは一時的にデータベースから取得されます。"
|
||||
fanoutTimelineDbFallback: "データベースへのフォールバック"
|
||||
fanoutTimelineDbFallbackDescription: "有効にすると、タイムラインがキャッシュされていない場合にDBへ追加で問い合わせを行うフォールバック処理を行います。無効にすると、フォールバック処理を行わないことでさらにサーバーの負荷を軽減することができますが、タイムラインが取得できる範囲に制限が生じます。"
|
||||
reactionsBufferingDescription: "有効にすると、リアクション作成時のパフォーマンスが大幅に向上し、データベースへの負荷を軽減することが可能です。ただし、Redisのメモリ使用量は増加します。"
|
||||
|
|
|
|||
|
|
@ -0,0 +1,22 @@
|
|||
/*
|
||||
* SPDX-FileCopyrightText: syuilo and misskey-project
|
||||
* SPDX-License-Identifier: AGPL-3.0-only
|
||||
*/
|
||||
|
||||
export class AddFanoutTimelineActive1780626317299 {
|
||||
name = 'AddFanoutTimelineActive1780626317299';
|
||||
|
||||
/**
|
||||
* @param {QueryRunner} queryRunner
|
||||
*/
|
||||
async up(queryRunner) {
|
||||
await queryRunner.query('ALTER TABLE "meta" ADD "fanoutTimelineActive" boolean NOT NULL DEFAULT true');
|
||||
}
|
||||
|
||||
/**
|
||||
* @param {QueryRunner} queryRunner
|
||||
*/
|
||||
async down(queryRunner) {
|
||||
await queryRunner.query('ALTER TABLE "meta" DROP COLUMN "fanoutTimelineActive"');
|
||||
}
|
||||
};
|
||||
|
|
@ -113,6 +113,31 @@ export class FanoutTimelineService {
|
|||
return this.redisForTimelines.del('list:' + name);
|
||||
}
|
||||
|
||||
/**
|
||||
* Redis 上のすべてのタイムラインリスト (`list:*`) を一括削除する。
|
||||
* `enableFanoutTimeline` の有効/無効を切り替えた直後など、Redis 上のキャッシュが
|
||||
* DB と整合しなくなる可能性があるタイミングで呼ぶことを想定。
|
||||
* 削除後は各タイムライン取得時にFanoutTimelineEndpointServiceが `noteIds.length === 0`
|
||||
* を検知してDB直行するため、結果としてDBから自然に再構築される。
|
||||
*/
|
||||
@bindThis
|
||||
public async purgeAll(): Promise<number> {
|
||||
let cursor = '0';
|
||||
let totalDeleted = 0;
|
||||
do {
|
||||
const [next, keys] = await this.redisForTimelines.scan(cursor, 'MATCH', this.redisForTimelines.options.keyPrefix + 'list:*', 'COUNT', 100);
|
||||
cursor = next;
|
||||
if (keys.length === 0) continue;
|
||||
// ioredis の keyPrefix は SCAN で返るキーには含まれているが DEL 渡し時に
|
||||
// 二重に付加されるのを防ぐため prefix を剥がして渡す
|
||||
const prefix = this.redisForTimelines.options.keyPrefix ?? '';
|
||||
const stripped = prefix !== '' ? keys.map(k => k.startsWith(prefix) ? k.slice(prefix.length) : k) : keys;
|
||||
await this.redisForTimelines.del(...stripped);
|
||||
totalDeleted += stripped.length;
|
||||
} while (cursor !== '0');
|
||||
return totalDeleted;
|
||||
}
|
||||
|
||||
@bindThis
|
||||
public remove(name: FanoutTimelineName, id: string) {
|
||||
return this.redisForTimelines.lrem('list:' + name, 1, id);
|
||||
|
|
|
|||
|
|
@ -1045,7 +1045,9 @@ export class NoteCreateService implements OnApplicationShutdown {
|
|||
|
||||
@bindThis
|
||||
private async pushToTl(note: MiNote, user: { id: MiUser['id']; host: MiUser['host']; }) {
|
||||
if (!this.meta.enableFanoutTimeline) return;
|
||||
// fanoutTimelineActive=false は admin がトグルを切り替えた直後の過渡期。
|
||||
// その間は FTTL への push を止め、データプレーンは DB のみで動かす。
|
||||
if (!this.meta.enableFanoutTimeline || !this.meta.fanoutTimelineActive) return;
|
||||
|
||||
const r = this.redisForTimelines.pipeline();
|
||||
|
||||
|
|
|
|||
|
|
@ -546,6 +546,30 @@ export class QueueService {
|
|||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* `enableFanoutTimeline` トグル時に Redis 上の list:* キャッシュを一掃するジョブ。
|
||||
* 完了後に `MetaService.update({ fanoutTimelineActive: targetState })` で
|
||||
* データプレーン側のスイッチを入れ直す。
|
||||
* 固定 jobId にしているので、過渡期中の二重 enqueue は BullMQ 側で抑止される。
|
||||
*/
|
||||
@bindThis
|
||||
public createPurgeFanoutTimelinesJob(targetState: boolean) {
|
||||
return this.systemQueue.add('purgeFanoutTimelines', { targetState }, {
|
||||
jobId: 'purgeFanoutTimelines',
|
||||
attempts: 3,
|
||||
backoff: {
|
||||
type: 'exponential',
|
||||
delay: 5000,
|
||||
},
|
||||
removeOnComplete: {
|
||||
age: 3600 * 24 * 7, // keep up to 7 days
|
||||
},
|
||||
removeOnFail: {
|
||||
age: 3600 * 24 * 7, // keep up to 7 days
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
@bindThis
|
||||
public createDeleteAccountJob(user: ThinUser, opts: { soft?: boolean; } = {}) {
|
||||
return this.dbQueue.add('deleteAccount', {
|
||||
|
|
|
|||
|
|
@ -579,6 +579,17 @@ export class MiMeta {
|
|||
})
|
||||
public enableFanoutTimeline: boolean;
|
||||
|
||||
/**
|
||||
* `enableFanoutTimeline` をトグルした直後の過渡期 (Redis 上の list:* キャッシュを
|
||||
* BullMQ ジョブで purge している間) は false になり、データプレーンは FTTL を一切
|
||||
* 読み書きしない (= OFF と同じ挙動)。purge ジョブ完了で true に戻る。
|
||||
* 過渡期中の `enableFanoutTimeline` 変更は admin endpoint 側で 409 で拒否する。
|
||||
*/
|
||||
@Column('boolean', {
|
||||
default: true,
|
||||
})
|
||||
public fanoutTimelineActive: boolean;
|
||||
|
||||
@Column('boolean', {
|
||||
default: true,
|
||||
})
|
||||
|
|
|
|||
|
|
@ -20,6 +20,7 @@ import { CleanChartsProcessorService } from './processors/CleanChartsProcessorSe
|
|||
import { CleanProcessorService } from './processors/CleanProcessorService.js';
|
||||
import { CheckModeratorsActivityProcessorService } from './processors/CheckModeratorsActivityProcessorService.js';
|
||||
import { CleanRemoteNotesProcessorService } from './processors/CleanRemoteNotesProcessorService.js';
|
||||
import { PurgeFanoutTimelinesProcessorService } from './processors/PurgeFanoutTimelinesProcessorService.js';
|
||||
import { CleanRemoteFilesProcessorService } from './processors/CleanRemoteFilesProcessorService.js';
|
||||
import { DeleteAccountProcessorService } from './processors/DeleteAccountProcessorService.js';
|
||||
import { DeleteDriveFilesProcessorService } from './processors/DeleteDriveFilesProcessorService.js';
|
||||
|
|
@ -87,6 +88,7 @@ import { RelationshipProcessorService } from './processors/RelationshipProcessor
|
|||
CheckExpiredMutingsProcessorService,
|
||||
CheckModeratorsActivityProcessorService,
|
||||
CleanRemoteNotesProcessorService,
|
||||
PurgeFanoutTimelinesProcessorService,
|
||||
QueueProcessorService,
|
||||
],
|
||||
exports: [
|
||||
|
|
|
|||
|
|
@ -44,6 +44,7 @@ import { BakeBufferedReactionsProcessorService } from './processors/BakeBuffered
|
|||
import { CleanProcessorService } from './processors/CleanProcessorService.js';
|
||||
import { AggregateRetentionProcessorService } from './processors/AggregateRetentionProcessorService.js';
|
||||
import { CleanRemoteNotesProcessorService } from './processors/CleanRemoteNotesProcessorService.js';
|
||||
import { PurgeFanoutTimelinesProcessorService } from './processors/PurgeFanoutTimelinesProcessorService.js';
|
||||
import { QueueLoggerService } from './QueueLoggerService.js';
|
||||
import { QUEUE, baseWorkerOptions } from './const.js';
|
||||
|
||||
|
|
@ -127,6 +128,7 @@ export class QueueProcessorService implements OnApplicationShutdown {
|
|||
private checkModeratorsActivityProcessorService: CheckModeratorsActivityProcessorService,
|
||||
private cleanProcessorService: CleanProcessorService,
|
||||
private cleanRemoteNotesProcessorService: CleanRemoteNotesProcessorService,
|
||||
private purgeFanoutTimelinesProcessorService: PurgeFanoutTimelinesProcessorService,
|
||||
) {
|
||||
this.logger = this.queueLoggerService.logger;
|
||||
|
||||
|
|
@ -176,6 +178,7 @@ export class QueueProcessorService implements OnApplicationShutdown {
|
|||
case 'checkModeratorsActivity': return this.checkModeratorsActivityProcessorService.process();
|
||||
case 'clean': return this.cleanProcessorService.process();
|
||||
case 'cleanRemoteNotes': return this.cleanRemoteNotesProcessorService.process(job);
|
||||
case 'purgeFanoutTimelines': return this.purgeFanoutTimelinesProcessorService.process(job);
|
||||
default: throw new Error(`unrecognized job type ${job.name} for system`);
|
||||
}
|
||||
};
|
||||
|
|
|
|||
|
|
@ -0,0 +1,36 @@
|
|||
/*
|
||||
* SPDX-FileCopyrightText: syuilo and misskey-project
|
||||
* SPDX-License-Identifier: AGPL-3.0-only
|
||||
*/
|
||||
|
||||
import { Injectable } from '@nestjs/common';
|
||||
import type Logger from '@/logger.js';
|
||||
import { bindThis } from '@/decorators.js';
|
||||
import { FanoutTimelineService } from '@/core/FanoutTimelineService.js';
|
||||
import { MetaService } from '@/core/MetaService.js';
|
||||
import { QueueLoggerService } from '../QueueLoggerService.js';
|
||||
import type * as Bull from 'bullmq';
|
||||
|
||||
@Injectable()
|
||||
export class PurgeFanoutTimelinesProcessorService {
|
||||
private logger: Logger;
|
||||
|
||||
constructor(
|
||||
private fanoutTimelineService: FanoutTimelineService,
|
||||
private metaService: MetaService,
|
||||
private queueLoggerService: QueueLoggerService,
|
||||
) {
|
||||
this.logger = this.queueLoggerService.logger.createSubLogger('purge-fanout-timelines');
|
||||
}
|
||||
|
||||
@bindThis
|
||||
public async process(job: Bull.Job<{ targetState: boolean }>): Promise<void> {
|
||||
const { targetState } = job.data;
|
||||
this.logger.info(`Purging fanout timelines (targetState=${targetState})...`);
|
||||
|
||||
const deleted = await this.fanoutTimelineService.purgeAll();
|
||||
await this.metaService.update({ fanoutTimelineActive: targetState });
|
||||
|
||||
this.logger.succ(`Purged ${deleted} fanout timeline keys. fanoutTimelineActive=${targetState}`);
|
||||
}
|
||||
}
|
||||
|
|
@ -474,7 +474,7 @@ export class ActivityPubServerService {
|
|||
const partOf = `${this.config.url}/users/${userId}/outbox`;
|
||||
|
||||
if (page) {
|
||||
const notes = this.meta.enableFanoutTimeline ? await this.fanoutTimelineEndpointService.getMiNotes({
|
||||
const notes = (this.meta.enableFanoutTimeline && this.meta.fanoutTimelineActive) ? await this.fanoutTimelineEndpointService.getMiNotes({
|
||||
sinceId: sinceId ?? null,
|
||||
untilId: untilId ?? null,
|
||||
limit: limit,
|
||||
|
|
|
|||
|
|
@ -379,6 +379,10 @@ export const meta = {
|
|||
type: 'boolean',
|
||||
optional: false, nullable: false,
|
||||
},
|
||||
fanoutTimelineActive: {
|
||||
type: 'boolean',
|
||||
optional: false, nullable: false,
|
||||
},
|
||||
enableFanoutTimelineDbFallback: {
|
||||
type: 'boolean',
|
||||
optional: false, nullable: false,
|
||||
|
|
@ -725,6 +729,7 @@ export default class extends Endpoint<typeof meta, typeof paramDef> { // eslint-
|
|||
policies: { ...DEFAULT_POLICIES, ...instance.policies },
|
||||
manifestJsonOverride: instance.manifestJsonOverride,
|
||||
enableFanoutTimeline: instance.enableFanoutTimeline,
|
||||
fanoutTimelineActive: instance.fanoutTimelineActive,
|
||||
enableFanoutTimelineDbFallback: instance.enableFanoutTimelineDbFallback,
|
||||
perLocalUserUserTimelineCacheMax: instance.perLocalUserUserTimelineCacheMax,
|
||||
perRemoteUserUserTimelineCacheMax: instance.perRemoteUserUserTimelineCacheMax,
|
||||
|
|
|
|||
|
|
@ -9,6 +9,8 @@ import type { MiMeta } from '@/models/Meta.js';
|
|||
import { ModerationLogService } from '@/core/ModerationLogService.js';
|
||||
import { Endpoint } from '@/server/api/endpoint-base.js';
|
||||
import { MetaService } from '@/core/MetaService.js';
|
||||
import { QueueService } from '@/core/QueueService.js';
|
||||
import { ApiError } from '@/server/api/error.js';
|
||||
|
||||
export const meta = {
|
||||
tags: ['admin'],
|
||||
|
|
@ -16,6 +18,15 @@ export const meta = {
|
|||
requireCredential: true,
|
||||
requireAdmin: true,
|
||||
kind: 'write:admin:meta',
|
||||
|
||||
errors: {
|
||||
fanoutTimelineTransitionInProgress: {
|
||||
message: 'A fanout timeline toggle is currently being applied. Please wait until the purge job completes before changing enableFanoutTimeline again.',
|
||||
code: 'FANOUT_TIMELINE_TRANSITION_IN_PROGRESS',
|
||||
id: '65b9b26a-700c-41ea-96f7-96b9a7902301',
|
||||
httpStatusCode: 409,
|
||||
},
|
||||
},
|
||||
} as const;
|
||||
|
||||
export const paramDef = {
|
||||
|
|
@ -230,6 +241,7 @@ export default class extends Endpoint<typeof meta, typeof paramDef> { // eslint-
|
|||
|
||||
private metaService: MetaService,
|
||||
private moderationLogService: ModerationLogService,
|
||||
private queueService: QueueService,
|
||||
) {
|
||||
super(meta, paramDef, async (ps, me) => {
|
||||
const set = {} as Partial<MiMeta>;
|
||||
|
|
@ -648,10 +660,6 @@ export default class extends Endpoint<typeof meta, typeof paramDef> { // eslint-
|
|||
set.manifestJsonOverride = ps.manifestJsonOverride;
|
||||
}
|
||||
|
||||
if (ps.enableFanoutTimeline !== undefined) {
|
||||
set.enableFanoutTimeline = ps.enableFanoutTimeline;
|
||||
}
|
||||
|
||||
if (ps.enableFanoutTimelineDbFallback !== undefined) {
|
||||
set.enableFanoutTimelineDbFallback = ps.enableFanoutTimelineDbFallback;
|
||||
}
|
||||
|
|
@ -764,11 +772,32 @@ export default class extends Endpoint<typeof meta, typeof paramDef> { // eslint-
|
|||
|
||||
const before = await this.metaService.fetch(true);
|
||||
|
||||
// enableFanoutTimeline をトグルする場合、Redis 上の list:* キャッシュを BullMQ
|
||||
// ジョブで一括 purge してから fanoutTimelineActive=true に戻す。過渡期 (active=false)
|
||||
// 中はデータプレーンが FTTL を読み書きしないため、push と purge の競合・MetaService
|
||||
// キャッシュ伝播ラグ・大規模インスタンスでの HTTP タイムアウト等を一掃できる。
|
||||
// 過渡期中に enableFanoutTimeline をさらに変更するリクエストは 409 で拒否する。
|
||||
if (ps.enableFanoutTimeline !== undefined) {
|
||||
if (ps.enableFanoutTimeline !== before.enableFanoutTimeline) {
|
||||
// 過渡期 = enableFanoutTimeline と fanoutTimelineActive が乖離している状態
|
||||
// (purge ジョブ進行中)。stable な OFF/OFF や ON/ON からのトグルは受理する。
|
||||
if (before.enableFanoutTimeline !== before.fanoutTimelineActive) {
|
||||
throw new ApiError(meta.errors.fanoutTimelineTransitionInProgress);
|
||||
}
|
||||
set.fanoutTimelineActive = false;
|
||||
}
|
||||
set.enableFanoutTimeline = ps.enableFanoutTimeline;
|
||||
}
|
||||
|
||||
await this.metaService.update(set);
|
||||
|
||||
const after = await this.metaService.fetch(true);
|
||||
|
||||
this.moderationLogService.log(me, 'updateServerSettings', {
|
||||
if (before.enableFanoutTimeline !== after.enableFanoutTimeline) {
|
||||
await this.queueService.createPurgeFanoutTimelinesJob(after.enableFanoutTimeline);
|
||||
}
|
||||
|
||||
await this.moderationLogService.log(me, 'updateServerSettings', {
|
||||
before,
|
||||
after,
|
||||
});
|
||||
|
|
|
|||
|
|
@ -88,7 +88,7 @@ export default class extends Endpoint<typeof meta, typeof paramDef> { // eslint-
|
|||
|
||||
if (me) this.activeUsersChart.read(me);
|
||||
|
||||
if (!this.serverSettings.enableFanoutTimeline) {
|
||||
if (!this.serverSettings.enableFanoutTimeline || !this.serverSettings.fanoutTimelineActive) {
|
||||
return await this.noteEntityService.packMany(await this.getFromDb({ untilId, sinceId, limit: ps.limit, channelId: channel.id }, me), me);
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -103,7 +103,7 @@ export default class extends Endpoint<typeof meta, typeof paramDef> { // eslint-
|
|||
|
||||
if (ps.withReplies && ps.withFiles) throw new ApiError(meta.errors.bothWithRepliesAndWithFiles);
|
||||
|
||||
if (!this.serverSettings.enableFanoutTimeline) {
|
||||
if (!this.serverSettings.enableFanoutTimeline || !this.serverSettings.fanoutTimelineActive) {
|
||||
const timeline = await this.getFromDb({
|
||||
untilId,
|
||||
sinceId,
|
||||
|
|
|
|||
|
|
@ -90,7 +90,7 @@ export default class extends Endpoint<typeof meta, typeof paramDef> { // eslint-
|
|||
|
||||
if (ps.withReplies && ps.withFiles) throw new ApiError(meta.errors.bothWithRepliesAndWithFiles);
|
||||
|
||||
if (!this.serverSettings.enableFanoutTimeline) {
|
||||
if (!this.serverSettings.enableFanoutTimeline || !this.serverSettings.fanoutTimelineActive) {
|
||||
const timeline = await this.getFromDb({
|
||||
untilId,
|
||||
sinceId,
|
||||
|
|
|
|||
|
|
@ -77,7 +77,7 @@ export default class extends Endpoint<typeof meta, typeof paramDef> { // eslint-
|
|||
const untilId = ps.untilId ?? (ps.untilDate ? this.idService.gen(ps.untilDate!) : null);
|
||||
const sinceId = ps.sinceId ?? (ps.sinceDate ? this.idService.gen(ps.sinceDate!) : null);
|
||||
|
||||
if (!this.serverSettings.enableFanoutTimeline) {
|
||||
if (!this.serverSettings.enableFanoutTimeline || !this.serverSettings.fanoutTimelineActive) {
|
||||
const timeline = await this.getFromDb({
|
||||
untilId,
|
||||
sinceId,
|
||||
|
|
|
|||
|
|
@ -100,7 +100,7 @@ export default class extends Endpoint<typeof meta, typeof paramDef> { // eslint-
|
|||
throw new ApiError(meta.errors.noSuchList);
|
||||
}
|
||||
|
||||
if (!this.serverSettings.enableFanoutTimeline) {
|
||||
if (!this.serverSettings.enableFanoutTimeline || !this.serverSettings.fanoutTimelineActive) {
|
||||
const timeline = await this.getFromDb(list, {
|
||||
untilId,
|
||||
sinceId,
|
||||
|
|
|
|||
|
|
@ -100,7 +100,7 @@ export default class extends Endpoint<typeof meta, typeof paramDef> { // eslint-
|
|||
}
|
||||
}
|
||||
|
||||
if (!this.serverSettings.enableFanoutTimeline) {
|
||||
if (!this.serverSettings.enableFanoutTimeline || !this.serverSettings.fanoutTimelineActive) {
|
||||
const timeline = await this.getFromDb({
|
||||
untilId,
|
||||
sinceId,
|
||||
|
|
|
|||
|
|
@ -9,13 +9,14 @@
|
|||
// pnpm jest -- e2e/timelines.ts
|
||||
|
||||
import * as assert from 'assert';
|
||||
import { describe, beforeAll, test } from 'vitest';
|
||||
import { describe, beforeAll, afterAll, test } from 'vitest';
|
||||
import { setTimeout } from 'node:timers/promises';
|
||||
import { entities } from 'misskey-js';
|
||||
import { Redis } from 'ioredis';
|
||||
import { SignupResponse, Note } from 'misskey-js/entities.js';
|
||||
import { api, initTestDb, post, randomString, sendEnvUpdateRequest, signup, uploadUrl, UserToken } from '../utils.js';
|
||||
import { api, initTestDb, post, randomString, sendEnvUpdateRequest, signup, startJobQueue, uploadUrl, UserToken } from '../utils.js';
|
||||
import { loadConfig } from '@/config.js';
|
||||
import type { INestApplicationContext } from '@nestjs/common';
|
||||
|
||||
function genHost() {
|
||||
return randomString() + '.example.com';
|
||||
|
|
@ -3326,4 +3327,149 @@ describe('Timelines', () => {
|
|||
// TODO: リノートミュート済みユーザーのテスト
|
||||
// TODO: ページネーションのテスト
|
||||
});
|
||||
|
||||
// `enableFanoutTimeline` を ON → OFF → ON のように切り替えると、
|
||||
// OFF 期間中に投稿されたノートは Redis 上のキャッシュリストに乗らないまま、
|
||||
// ON 復帰後の新規投稿だけが Redis 先頭に追加されるため、Redis 上の時系列に
|
||||
// ギャップが生じ、タイムライン取得時にギャップ範囲のノートが取りこぼされる問題に対する回帰テスト。
|
||||
// 修正方針: enableFanoutTimeline をトグルすると過渡期 (fanoutTimelineActive=false) に入り、
|
||||
// BullMQ の purgeFanoutTimelines ジョブが Redis 上の list:* をパージしてから
|
||||
// fanoutTimelineActive=true に戻す。過渡期中はデータプレーンが FTTL を一切触らない。
|
||||
describe('FTT toggle purge', () => {
|
||||
// E2E ランナー (test-server/entry.ts) ではジョブキューワーカーがデフォルト無効。
|
||||
// 過渡期解消には purgeFanoutTimelines ジョブの完走が必要なので、ここだけ起動する。
|
||||
let queue: INestApplicationContext;
|
||||
|
||||
beforeAll(async () => {
|
||||
queue = await startJobQueue();
|
||||
}, 1000 * 60 * 2);
|
||||
|
||||
afterAll(async () => {
|
||||
await queue.close();
|
||||
});
|
||||
|
||||
// 直前のトグルが起こしたパージジョブが完走するまで待つ。
|
||||
// `enableFanoutTimeline === fanoutTimelineActive` になったらジョブは終わって
|
||||
// データプレーンが新しい状態に切り替わっている。
|
||||
async function waitForFanoutTimelineSettled(): Promise<void> {
|
||||
for (let i = 0; i < 100; i++) {
|
||||
const res = await api('admin/meta', {}, root);
|
||||
const body = res.body as { enableFanoutTimeline: boolean; fanoutTimelineActive: boolean };
|
||||
if (body.enableFanoutTimeline === body.fanoutTimelineActive) return;
|
||||
await setTimeout(100);
|
||||
}
|
||||
throw new Error('fanoutTimelineActive did not settle in time');
|
||||
}
|
||||
|
||||
test('過渡期中の enableFanoutTimeline 変更は 409 で拒否される', async () => {
|
||||
await api('admin/update-meta', { enableFanoutTimeline: true }, root);
|
||||
await waitForFanoutTimelineSettled();
|
||||
|
||||
// OFF にして過渡期に入れる (パージジョブが走り終わるまでは active=false)
|
||||
const offRes = await api('admin/update-meta', { enableFanoutTimeline: false }, root);
|
||||
assert.strictEqual(offRes.status, 204);
|
||||
|
||||
// active=false の間に enableFanoutTimeline を変えようとすると 409
|
||||
const conflictRes = await api('admin/update-meta', { enableFanoutTimeline: true }, root);
|
||||
assert.strictEqual(conflictRes.status, 400);
|
||||
assert.strictEqual((conflictRes.body as { error: { code: string } }).error.code, 'FANOUT_TIMELINE_TRANSITION_IN_PROGRESS');
|
||||
|
||||
// ジョブ完了後は同じトグルが受理される
|
||||
await waitForFanoutTimelineSettled();
|
||||
const okRes = await api('admin/update-meta', { enableFanoutTimeline: true }, root);
|
||||
assert.strictEqual(okRes.status, 204);
|
||||
await waitForFanoutTimelineSettled();
|
||||
});
|
||||
|
||||
test('enableFanoutTimeline を ON → OFF → ON と切り替えた前後の投稿が全て取得できる (HTL)', async () => {
|
||||
const alice = await signup();
|
||||
|
||||
await api('admin/update-meta', { enableFanoutTimeline: true }, root);
|
||||
await waitForFanoutTimelineSettled();
|
||||
|
||||
// FTT ON 期間 1: 5 件投稿 (Redis に lpush される)
|
||||
const phase1Ids: string[] = [];
|
||||
for (let i = 0; i < 5; i++) {
|
||||
const n = await post(alice, { text: `phase1 ${i}` });
|
||||
phase1Ids.push(n.id);
|
||||
}
|
||||
await setTimeout(500);
|
||||
|
||||
// FTT OFF: 過渡期中はジョブが Redis を purge する。完了で active=false に確定。
|
||||
await api('admin/update-meta', { enableFanoutTimeline: false }, root);
|
||||
await waitForFanoutTimelineSettled();
|
||||
|
||||
// OFF 期間中の投稿: FanoutTimelineService.push が呼ばれず Redis に乗らない
|
||||
const phase2Ids: string[] = [];
|
||||
for (let i = 0; i < 5; i++) {
|
||||
const n = await post(alice, { text: `phase2 ${i}` });
|
||||
phase2Ids.push(n.id);
|
||||
}
|
||||
await setTimeout(500);
|
||||
|
||||
// FTT ON 復帰: 過渡期中は push も get も走らず、ジョブ完了で active=true に戻る
|
||||
await api('admin/update-meta', { enableFanoutTimeline: true }, root);
|
||||
await waitForFanoutTimelineSettled();
|
||||
|
||||
// ON 復帰後の投稿: Redis 先頭に lpush される
|
||||
const phase3Ids: string[] = [];
|
||||
for (let i = 0; i < 5; i++) {
|
||||
const n = await post(alice, { text: `phase3 ${i}` });
|
||||
phase3Ids.push(n.id);
|
||||
}
|
||||
await setTimeout(500);
|
||||
|
||||
const res = await api('notes/timeline', { limit: 20 }, alice);
|
||||
const returnedIds = (res.body as Note[]).map(n => n.id);
|
||||
|
||||
// 期待: 投稿した全 15 件 (phase1 + phase2 + phase3) が返る
|
||||
for (const id of [...phase1Ids, ...phase2Ids, ...phase3Ids]) {
|
||||
assert.ok(returnedIds.includes(id), `missing ${id} in HTL result`);
|
||||
}
|
||||
assert.strictEqual(returnedIds.length, 15);
|
||||
});
|
||||
|
||||
test('enableFanoutTimeline を ON → OFF → ON と切り替えた前後の投稿が全て取得できる (Channel TL)', async () => {
|
||||
const alice = await signup();
|
||||
const channel = await createChannel('fttl-toggle-' + randomString(), alice);
|
||||
|
||||
await api('admin/update-meta', { enableFanoutTimeline: true }, root);
|
||||
await waitForFanoutTimelineSettled();
|
||||
|
||||
const phase1Ids: string[] = [];
|
||||
for (let i = 0; i < 5; i++) {
|
||||
const n = await post(alice, { text: `phase1 ${i}`, channelId: channel.id });
|
||||
phase1Ids.push(n.id);
|
||||
}
|
||||
await setTimeout(500);
|
||||
|
||||
await api('admin/update-meta', { enableFanoutTimeline: false }, root);
|
||||
await waitForFanoutTimelineSettled();
|
||||
|
||||
const phase2Ids: string[] = [];
|
||||
for (let i = 0; i < 5; i++) {
|
||||
const n = await post(alice, { text: `phase2 ${i}`, channelId: channel.id });
|
||||
phase2Ids.push(n.id);
|
||||
}
|
||||
await setTimeout(500);
|
||||
|
||||
await api('admin/update-meta', { enableFanoutTimeline: true }, root);
|
||||
await waitForFanoutTimelineSettled();
|
||||
|
||||
const phase3Ids: string[] = [];
|
||||
for (let i = 0; i < 5; i++) {
|
||||
const n = await post(alice, { text: `phase3 ${i}`, channelId: channel.id });
|
||||
phase3Ids.push(n.id);
|
||||
}
|
||||
await setTimeout(500);
|
||||
|
||||
const res = await api('channels/timeline', { channelId: channel.id, limit: 20 });
|
||||
const returnedIds = (res.body as Note[]).map(n => n.id);
|
||||
|
||||
for (const id of [...phase1Ids, ...phase2Ids, ...phase3Ids]) {
|
||||
assert.ok(returnedIds.includes(id), `missing ${id} in Channel TL result`);
|
||||
}
|
||||
assert.strictEqual(returnedIds.length, 15);
|
||||
});
|
||||
});
|
||||
});
|
||||
|
|
|
|||
|
|
@ -74,10 +74,11 @@ SPDX-License-Identifier: AGPL-3.0-only
|
|||
|
||||
<div class="_gaps">
|
||||
<SearchMarker>
|
||||
<MkSwitch v-model="fttForm.state.enableFanoutTimeline">
|
||||
<MkSwitch v-model="fttForm.state.enableFanoutTimeline" :disabled="meta.enableFanoutTimeline !== meta.fanoutTimelineActive">
|
||||
<template #label><SearchLabel>{{ i18n.ts.enable }}</SearchLabel><span v-if="fttForm.modifiedStates.enableFanoutTimeline" class="_modified">{{ i18n.ts.modified }}</span></template>
|
||||
<template #caption>
|
||||
<div><SearchText>{{ i18n.ts._serverSettings.fanoutTimelineDescription }}</SearchText></div>
|
||||
<div v-if="meta.enableFanoutTimeline !== meta.fanoutTimelineActive" class="_modified">{{ i18n.ts._serverSettings.fanoutTimelineTransitionInProgress }}</div>
|
||||
<div><MkLink target="_blank" url="https://misskey-hub.net/docs/for-admin/features/ftt/">{{ i18n.ts.details }}</MkLink></div>
|
||||
</template>
|
||||
</MkSwitch>
|
||||
|
|
|
|||
|
|
@ -6900,6 +6900,10 @@ export interface Locale extends ILocale {
|
|||
* 有効にすると、各種タイムラインを取得する際のパフォーマンスが大幅に向上し、データベースへの負荷を軽減することが可能です。ただし、Redisのメモリ使用量は増加します。サーバーのメモリ容量が少ない場合、または動作が不安定な場合は無効にすることができます。
|
||||
*/
|
||||
"fanoutTimelineDescription": string;
|
||||
/**
|
||||
* 切り替えを適用中です。完了するまでこのトグルは操作できません。適用中、タイムラインは一時的にデータベースから取得されます。
|
||||
*/
|
||||
"fanoutTimelineTransitionInProgress": string;
|
||||
/**
|
||||
* データベースへのフォールバック
|
||||
*/
|
||||
|
|
|
|||
|
|
@ -9503,6 +9503,7 @@ export interface operations {
|
|||
manifestJsonOverride: string;
|
||||
policies: Record<string, never>;
|
||||
enableFanoutTimeline: boolean;
|
||||
fanoutTimelineActive: boolean;
|
||||
enableFanoutTimelineDbFallback: boolean;
|
||||
perLocalUserUserTimelineCacheMax: number;
|
||||
perRemoteUserUserTimelineCacheMax: number;
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue