チェンジストリーム

チェンジストリームを使用すると、特定のモデルのコレクション内のドキュメント、またはデータベース全体内のドキュメントに対する更新情報をリッスンできます。ミドルウェアとは異なり、チェンジストリームはMongoDBサーバーの構造体であり、つまり、どこからでも変更を拾い上げます。MongoDB GUIからドキュメントを更新した場合でも、Mongooseチェンジストリームは通知を受けます。

watch() 関数はチェンジストリームを作成します。チェンジストリームは、ドキュメントの更新時に'data' イベントを発行します。

const Person = mongoose.model('Person', new mongoose.Schema({ name: String }));

// Create a change stream. The 'change' event gets emitted when there's a
// change in the database. Print what the change stream emits.
Person.watch().
  on('change', data => console.log(data));

// Insert a doc, will trigger the change stream handler above
await Person.create({ name: 'Axl Rose' });

上記のスクリプトは、次のような出力を印刷します。

{ _id: { _data: '8262408DAC000000012B022C0100296E5A10042890851837DB4792BE6B235E8B85489F46645F6964006462408DAC6F5C42FF5EE087A20004' }, operationType: 'insert', clusterTime: new Timestamp({ t: 1648397740, i: 1 }), fullDocument: { _id: new ObjectId("62408dac6f5c42ff5ee087a2"), name: 'Axl Rose', __v: 0 }, ns: { db: 'test', coll: 'people' }, documentKey: { _id: new ObjectId("62408dac6f5c42ff5ee087a2") } }

チェンジストリームを使用するには、必ずMongoDBレプリカセットまたはシャード化されたクラスタに接続している必要があります。スタンドアロンのMongoDBサーバーに接続しているときにwatch()を呼び出そうとすると、以下のエラーが発生します。

MongoServerError: $changeStream ステージは、レプリカセットでのみサポートされています

運用環境でwatch()を使用している場合は、MongoDB Atlasの使用を推奨します。ローカル開発には、レプリカセットをローカルで開始するためのmongodb-memory-serverまたはrun-rsを使用することを推奨します。

next()を使用して反復処理する

AWS Lambda 関数でチェンジストリームを反復処理する場合は、チェンジストリームをリッスンするためにイベントエミッターを使用しないでください。Lambdaが、チェンジストリームがMongoDBからデータをプルしている間にコンテナを停止した場合には、チェンジストリームの状態が矛盾する可能性があるため、Lambda関数の実行が終わったときにチェンジストリームを閉じる必要があります。

チェンジストリームには、次の変更が来るまで明示的に待機できるnext()関数もあります。resumeAfterを使用して、最後のチェンジストリームが残した場所を追跡し、変更が入ってこない場合にハンドラーが無限に待機しないようにタイムアウトを追加します。

let resumeAfter = undefined;

exports.handler = async(event, context) => {
  // add this so that we can re-use any static/global variables between function calls if Lambda
  // happens to re-use existing containers for the invocation.
  context.callbackWaitsForEmptyEventLoop = false;

  await connectToDatabase();

  const changeStream = await Country.watch([], { resumeAfter });

  // Change stream `next()` will wait forever if there are no changes. So make sure to
  // stop listening to the change stream after a fixed period of time.
  const timeoutPromise = new Promise(resolve => setTimeout(() => resolve(false), 1000));
  let doc = null;
  while (doc = await Promise.race([changeStream.next(), timeoutPromise])) {
    console.log('Got', doc);
  }

  // `resumeToken` tells you where the change stream left off, so next function instance
  // can pick up any changes that happened in the meantime.
  resumeAfter = changeStream.resumeToken;
  await changeStream.close();
};