MongoDB Transaction WriteConflict问题分析

问题描述

最近在项目中因为涉及到 多表数据同步 ,需要原子性操作多个表,所以使用了MongoDB Transaction。
在使用的过程中,Transaction中表的写操作,遇到了 WriteConflict 报错,具体如下。

WriteCommandError({
        "errorLabels" : [
                "TransientTransactionError"
        "operationTime" : Timestamp(1596785629, 1),
        "ok" : 0,
        "errmsg" : "WriteConflict error: this operation conflicted with another operation. Please retry your operation or multi-document transaction.",
        "code" : 112,
        "codeName" : "WriteConflict",

根据报错,查了相关资料。原因是 由于MongoDB Server在处理并发请求时采用了 乐观锁,不同于MySql中的的悲观锁(如果有并发修改,会卡住后来请求等待锁的释放),乐观锁基于版本控制的思路,不加锁,先修改,如果在修改过程中,有并发写操作,版本号对不上,则再报错。对应MongoDB Server侧抛出WriteConflict,写冲突异常。可以结合官方文档查看。

截取来自官方文档 (https://www.mongodb.com/docs/v4.4/core/transactions-production-consideration/#in-progress-transactions-and-write-conflicts)

In-progress Transactions and Write Conflicts
If a transaction is in progress and a write outside the transaction modifies a document that an operation in the transaction later tries to modify, the transaction aborts because of a write conflict.

If a transaction is in progress and has taken a lock to modify a document, when a write outside the transaction tries to modify the same document, the write waits until the transaction ends.

第一段:两个transction操作中同时修改一个document情况,后修改的transcation中操作会被MongoDB Server抛出WriteConflict, 如下图

官方已给出答案,就是在Mongo client端(也是driver端),catch error,然后重试。

贴一段官方代码:

async function commitWithRetry(session) {
  try {
    await session.commitTransaction();
    console.log('Transaction committed.');
  } catch (error) {
    if (error.hasErrorLabel('UnknownTransactionCommitResult')) {
      console.log('UnknownTransactionCommitResult, retrying commit operation ...');
      await commitWithRetry(session);
    } else {
      console.log('Error during commit ...');
      throw error;
async function runTransactionWithRetry(txnFunc, client, session) {
  try {
    await txnFunc(client, session);
  } catch (error) {
    console.log('Transaction aborted. Caught exception during transaction.');
    // If transient error, retry the whole transaction
    if (error.hasErrorLabel('TransientTransactionError')) {
      console.log('TransientTransactionError, retrying transaction ...');
      await runTransactionWithRetry(txnFunc, client, session);
    } else {
      throw error;
async function updateEmployeeInfo(client, session) {
  session.startTransaction({
    readConcern: { level: 'snapshot' },
    writeConcern: { w: 'majority' },
    readPreference: 'primary'
  const employeesCollection = client.db('hr').collection('employees');
  const eventsCollection = client.db('reporting').collection('events');
  await employeesCollection.updateOne(
    { employee: 3 },
    { $set: { status: 'Inactive' } },
    { session }
  await eventsCollection.insertOne(
      employee: 3,
      status: { new: 'Inactive', old: 'Active' }
    { session }
  try {
    await commitWithRetry(session);
  } catch (error) {
    await session.abortTransaction();
    throw error;
return client.withSession(session =>
  runTransactionWithRetry(updateEmployeeInfo, client, session)

相关链接:https://www.mongodb.com/docs/v4.4/core/transactions-in-applications/?_ga=2.179099232.531592764.1654272498-452737179.1639669270

针对transaction有几点实用的调优方式,如下

a) 避免使用transaction

可以通过一些写法避免transaction, 这里有个思路,适合数据量比较小的场景。

try {
    await session.withTransaction(async() => {
        await db.collection('branches').updateOne({
            _id: fromBranch
            $inc: {
                balance: -1 * dollars
            session
        await db.collection('branches').
        updateOne({
            _id: toBranch
            $inc: {
                balance: dollars
            session
    }, transactionOptions);
} catch (error) {
    console.log(error.message);

如果branch数量比较小,可以考虑合并成一个document,类似:

mongo > db.embeddedBranches.findOne(); {
    "_id": 1,
    "branchTotals": [{
            "branchId": 0,
            "balance": 101208675
            "branchId": 1,
            "balance": 98409758
            "branchId": 2,
            "balance": 99407654
            "branchId": 3,
            "balance": 98807890

则可以不用transaction,

try {
    let updateString =
        `{"$inc":{ 
            "branchTotals.` + fromBranch + `.balance":` + dollars + `, 
            "branchTotals.` + toBranch + `.balance":` + dollars + `}
    let updateClause = JSON.parse(updateString);
    await db.collection('embeddedBranches').updateOne({
        _id: 1
    }, updateClause);
} catch (error) {
    console.log(error.message);

b) 移动transaction中op顺序,减少临界区

拿一个转账交易场景举例, 3个op, op1为全局交易计数,op2为某个账户余额减少,op3为某个账户余额增加。

1 await session.withTransaction(async () => {
2        await db.collection('txnTotals').
3         updateOne({ _id: 1 }, 
4            { $inc: {  counter: 1 }  }, 
5            { session });
6        await db.collection('accounts').
7          updateOne({ _id: fromAcc }, 
8            { $inc: {  balance: -1*dollars }  }, 
9            { session });
10        await db.collection('accounts').
11          updateOne({ _id: toAcc }, 
12            { $inc: {  balance: dollars }  }, 
13            { session });
14 }, transactionOptions);;

对于每一笔交易,此时临界区从第2行就开始了,而如果把op1移到尾部(原op3的位置),临界区则从第10行才开始,临界区范围降低,可减少碰撞几率,减少写冲突。

c) 切分热点数据(Partitioning Hot Documents)

还是以这个全局计数举例,假设每个交易的transaction中都有这个op,则txnTotals无疑是一个热点数据,

await db.collection('txnTotals').updateOne(
    { _id: 1 }, 
    { $inc: {counter: 1} }, 
    { session }

则我们可以 将其切分(Partitioning)降低热度,如下

let id = Math.floor(Math.random() * 10);
await db.collection('txnTotals').updateOne(
    { _id: id }, 
    { $inc: {counter: 1}}, 
    { session }