事务中断
有两种情况会导致事务中断
- 系统由于内部原因退出
- 用户手动输入ROLLBACK
之所以要区分这两种,是因为用户手动推出后,事务状态就回到了默认状态,而由于系统原因退出后仍会有同一事务块的语句到来,此时需要忽略它们。
两种事务中断导致的状态变化
用户Rollback
调用UserAbortTransactionBlock
,根据事务块状态修改状态。
状态如果是TBLOCK_INPROGRESS
则设置为TBLOCK_ABORT_PENDING
等待CommitTransactionCommand
执行回滚
如果是TBLOCK_ABORT
则设置为TBLOCK_ABORT_END
说明已经完成回滚不需要做其他事。
如果在子事务块中,则判断循环找到父事务块,判断父事务块状态,父事务块状态,如果父事务块是SUBINPROGRESS则设置为SUBABORT_PENDING等待回滚,如果是SUBABORT则设置为SUBABORT_END。如果均不是,则报错。
如果在非事务块或者隐式事务块中,既处于STARTED或者IMPLICIT_INPROGRESS状态,则报错,设置状态为TBLOCK_ABORT_PENDING
,等待CommitTransactionCommand
将状态设置回默认状态。
如果在parallel工作中手动触发abort,无法处理报错。
其它所有状态均因为正常情况不可能出现报错。
系统原因
调用AbortCurrentTransaction
根据事务块状态修改状态,并调用AbortTransaction
完成abort。
如果事务块和事务都在DEFAULT
状态,本来就无事可做,直接不管。如果事务块在默认状态但是事务不在,则可能是事务开始过程中出错。如果是开始过程中出错,设置事务状态为INPROGRESS状态以压制AbortTransaction
中的warning。
如果不在DEFAULT状态,同时也不在事务块内、刚BEGIN或END、处于PREPARE状态,正常处理后事务块改为默认状态。
如果在INPROGRESS或者PARALLEL_INPROGRESS,调用AbortTransaction
后设置为ABORT状态等待回滚。
如果在ABORT或者SUBABORT状态,说明在已经等待回滚,不管。
如果在ABORT_END状态,说明已经完成回滚,调用CleanupTransaction
清理事务并回到默认状态。
如果在ABORT_PENDING状态说明读取到了ROLLBACK指令,回滚、清理,回到默认状态。
如果处于SUBINPROGRESS状态,说明在子事务中发现错误,调用AbortSubTransaction
回滚子事务,设置当前事务块状态为SUBABORT。
如果正在创建、结束子事务时出错,回滚清理子事务,递归调用自己触发abort。如果处于SUBABORT_END或者SUBABORT_RESTART,与上述相同,只是不需要abort子事务。
底层如何处理事务中断和清理
事务中断
AbortTransaction
一旦发现事务失败时就执行,释放所有共享资源(如锁)从而不影响其它后端。
关闭中断
中断过程不能被其它原因影响。
在大多数情况下切换内存上下文到TransactionAbortContext,极少数情况其为空,则切换到TopMemoryContext。
CurrentResourceOwner = TopTransactionResourceOwner;
,将当前资源拥有者设置为顶层事务资源拥有者。
上述二者确保内存上下文和资源跟踪器有效
调用LWLockReleaseAll释放所有轻量级锁
LWLock主要提供对共享内存变量的互斥访问,比如Clog buffer(事务提交状态缓存)、Shared buffers(数据页缓存)、Substran buffer(子事务缓存)等等。
释放是因为在清理过程中可能会用到。
普通锁不释放,在中断后释放。
1
2
3
4
5
6
7
8
9
10
|
void
LWLockReleaseAll(void)
{
while (num_held_lwlocks > 0)
{
HOLD_INTERRUPTS(); /* match the upcoming RESUME_INTERRUPTS */
LWLockRelease(held_lwlocks[num_held_lwlocks - 1].lock);
}
}
|
代码中循环检测拥有的轻量级锁,并循环尝试释放。
清理等待信息和指令指标
1
2
3
4
5
6
|
static inline void
pgstat_report_wait_end(void)
{
/* see pgstat_report_wait_start() */
*(volatile uint32 *) my_wait_event_info = 0;
}
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
void
pgstat_progress_end_command(void)
{
volatile PgBackendStatus *beentry = MyBEEntry;
if (!beentry || !pgstat_track_activities)
return;
if (beentry->st_progress_command == PROGRESS_COMMAND_INVALID)
return;
PGSTAT_BEGIN_WRITE_ACTIVITY(beentry);
beentry->st_progress_command = PROGRESS_COMMAND_INVALID;
beentry->st_progress_command_target = InvalidOid;
PGSTAT_END_WRITE_ACTIVITY(beentry);
}
|
调用AbortBufferIO、UnlockBuffers
这一步的目的是释放所有的BufferPin锁,对于Buffer的访问由Buffer pin锁保护,而这个锁并不会在LW锁释放的过程中被释放,故需要中断、释放。
对于共享缓存内容的锁已经释放过了,此处只需要释放清理PIN_COUNT请求。
清理预写记录
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
XLogResetInsertion(void)
{
int i;
for (i = 0; i < max_registered_block_id; i++)
registered_buffers[i].in_use = false;
num_rdatas = 0;
max_registered_block_id = 0;
mainrdata_len = 0;
mainrdata_last = (XLogRecData *) &mainrdata_head;
curinsert_flags = 0;
begininsert_called = false;
}
|
取消自己在任何睡眠条件变量队列中的记录
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
ConditionVariableCancelSleep(void)
{
ConditionVariable *cv = cv_sleep_target;
bool signaled = false;
if (cv == NULL)
return;
SpinLockAcquire(&cv->mutex);
if (proclist_contains(&cv->wakeup, MyProc->pgprocno, cvWaitLink))
proclist_delete(&cv->wakeup, MyProc->pgprocno, cvWaitLink);
else
signaled = true;
SpinLockRelease(&cv->mutex);
if (signaled)
ConditionVariableSignal(cv);
cv_sleep_target = NULL;
}
|
清理所有对锁的等待LockErrorCleanup
如果不执行此步,对其他锁的申请将会无法执行。
重新规划活动超时事件reschedule_timeouts
在清理锁等待后执行,从而避免无意义规划死锁检查超时事件。
设置事务状态
设置事务状态为ABORT
其它操作
1
|
SetUserIdAndSecContext(s->prevUser, s->prevSecContext);
|
设置回原有的Userid和安全控制上下文
1
|
ResetReindexState(s->nestingLevel);
|
重置重索引状态
1
|
esetLogicalStreamingState();
|
重置逻辑流状态
1
|
SnapBuildResetExportedSnapshotState();
|
清除快照导出状态
1
2
3
4
5
|
if (IsInParallelMode())
{
AtEOXact_Parallel(false);
s->parallelModeLevel = 0;
}
|
如果在并行模式,清理所有工作者并退出并行模式
1
2
3
4
5
6
7
|
AfterTriggerEndXact(false); /* 'false' means it's abort */
AtAbort_Portals();
smgrDoPendingSyncs(false, is_parallel_worker);
AtEOXact_LargeObject(false);
AtAbort_Notify();
AtEOXact_RelationMap(false, is_parallel_worker);
AtAbort_Twophase();
|
一系列中断操作
1
2
3
4
5
6
7
|
if (!is_parallel_worker)
latestXid = RecordTransactionAbort(false);
else
{
latestXid = InvalidTransactionId;
XLogSetAsyncXactLSN(XactLastRecEnd);
}
|
如果当前不在并行模式下,调用RecordTransactionAbort函数记录事务中止的信息,并将返回的XID赋值给latestXid变量。
如果当前在并行模式下,将latestXid设置为InvalidTransactionId,表示没有有效的XID可以用于广告。调用XLogSetAsyncXactLSN函数,将XactLastRecEnd的值写入WAL(Write-Ahead Logging)日志中。
这样做的原因是,在并行模式下,主节点无法获取到当前事务的XactLastRecEnd值,因此需要通过写入WAL日志来通知并行工作的节点。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
|
if (TopTransactionResourceOwner != NULL)
{
if (is_parallel_worker)
CallXactCallbacks(XACT_EVENT_PARALLEL_ABORT);
else
CallXactCallbacks(XACT_EVENT_ABORT);
ResourceOwnerRelease(TopTransactionResourceOwner,
RESOURCE_RELEASE_BEFORE_LOCKS,
false, true);
AtEOXact_Buffers(false);
AtEOXact_RelationCache(false);
AtEOXact_Inval(false);
AtEOXact_MultiXact();
ResourceOwnerRelease(TopTransactionResourceOwner,
RESOURCE_RELEASE_LOCKS,
false, true);
ResourceOwnerRelease(TopTransactionResourceOwner,
RESOURCE_RELEASE_AFTER_LOCKS,
false, true);
smgrDoPendingDeletes(false);
AtEOXact_GUC(false, 1);
AtEOXact_SPI(false);
AtEOXact_Enum();
AtEOXact_on_commit_actions(false);
AtEOXact_Namespace(false, is_parallel_worker);
AtEOXact_SMgr();
AtEOXact_Files(false);
AtEOXact_ComboCid();
AtEOXact_HashTables(false);
AtEOXact_PgStat(false, is_parallel_worker);
AtEOXact_ApplyLauncher(false);
pgstat_report_xact_timestamp(0);
}
|
一顿操作,大概是释放拥有的资源。
事务清理
CleanupTransaction
在最后遇见COMMIT或者ROLLBACK时执行,进行最后的清理彻底退出事务。
值得注意的是当事务提交后并不是立刻推出,而是设置为TBLOCK_END
状态,在CommitTransactionCommand
被调用时关闭事务。这使得控制权脱离事务管理,主循环可以继续在同一个事务中运行。