PostgreSQL源码学习–更新数据#3[亲测有效]

PostgreSQL源码学习–更新数据#3[亲测有效]本节介绍ExecUpdate函数。 ExecUpdate函数 static TupleTableSlot * ExecUpdate(ModifyTableState *mtstate, ItemPo…

PostgreSQL源码学习--更新数据#3

本节介绍ExecUpdate函数。

ExecUpdate函数

static TupleTableSlot *
ExecUpdate(ModifyTableState *mtstate,
		   ItemPointer tupleid,
		   HeapTuple oldtuple,
		   TupleTableSlot *slot,
		   TupleTableSlot *planSlot,
		   EPQState *epqstate,
		   EState *estate,
		   bool canSetTag);

代码100分

代码100分/* 必须在(隐式)事务中进行 */
if (IsBootstrapProcessingMode())
	elog(ERROR, "cannot UPDATE during bootstrap");

ExecMaterializeSlot(slot);

/* 获取结果关系的信息 */
resultRelInfo = estate->es_result_relation_info;
resultRelationDesc = resultRelInfo->ri_RelationDesc;

/* BEFORE ROW UPDATE 触发器 */
if (resultRelInfo->ri_TrigDesc &&
	resultRelInfo->ri_TrigDesc->trig_update_before_row)
{
	if (!ExecBRUpdateTriggers(estate, epqstate, resultRelInfo,
						  tupleid, oldtuple, slot))
		return NULL;		/* "do nothing" */
}

/* INSTEAD OF ROW UPDATE 触发器 */
if (resultRelInfo->ri_TrigDesc &&
	resultRelInfo->ri_TrigDesc->trig_update_instead_row)
{
	if (!ExecIRUpdateTriggers(estate, resultRelInfo,
						  oldtuple, slot))
		return NULL;		/* "do nothing" */
}
/* 若是外部表 */
else if (resultRelInfo->ri_FdwRoutine)
{
	/* 先计算生成列 */
	if (resultRelationDesc->rd_att->constr &&
		resultRelationDesc->rd_att->constr->has_generated_stored)
		ExecComputeStoredGenerated(estate, slot);
	
	/* update外部表,使用fdw */
	slot = resultRelInfo->ri_FdwRoutine->ExecForeignUpdate(estate,
								   resultRelInfo,
								   slot,
								   planSlot);
	
	if (slot == NULL)	/* "do nothing" */
		return NULL;
	
	/* AFTER ROW触发器或RETURNING表达式有可能会用到tableoid,所以初始化一下tts_tableOid */
	slot->tts_tableOid = RelationGetRelid(resultRelationDesc);
}
/* 一般的update流程 */
else
{
	/* 约束表达式可能会用到tableoid,因此初始化一下tts_tableOid */
	slot->tts_tableOid = RelationGetRelid(resultRelationDesc);
	
	/* 计算生成列 */
	if (resultRelationDesc->rd_att->constr &&
		resultRelationDesc->rd_att->constr->has_generated_stored)
		ExecComputeStoredGenerated(estate, slot);
	
/* goto跳跃点 */
lreplace:
	/* 确保slot被物化(考虑到可能到来的EPQ等) */
	ExecMaterializeSlot(slot);
	
	/* 检查元组是否符合分区约束条件 */
	partition_constraint_failed =
		resultRelInfo->ri_PartitionCheck &&
		!ExecPartitionCheck(resultRelInfo, slot, estate, false);
	
	/* 当不符合分区约束条件时跳过WCO检查
	 *(因为在插入前的触发器可能还会更改元组,所以wco放到插入数据前检查) */
	if (!partition_constraint_failed &&
		resultRelInfo->ri_WithCheckOptions != NIL)
	{
		ExecWithCheckOptions(WCO_RLS_UPDATE_CHECK,
					 resultRelInfo, slot, estate);
	}
	
	/* 若不符合分区约束,尝试将行移动到正确的分区中 */
	if (partition_constraint_failed)
	{
		PartitionTupleRouting *proute = mtstate->mt_partition_tuple_routing;
		
		/* 若是INSERT ON CONFLICT DO UPDATE导致的update,这意味着当前元组不符合分区约束
		 * 但是当前分区却存在有冲突的元组,这样的话报错 */
		if (((ModifyTable *) mtstate->ps.plan)->onConflictAction == ONCONFLICT_UPDATE)
			ereport(ERROR,
					(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
					 errmsg("invalid ON UPDATE specification"),
					 errdetail("The result tuple would appear in a different partition than the original tuple.")));
		
		/* 没有设置分区路由表明现在在叶分区上执行操作,因此出现了不符合分区约束那么就报错 */
		if (proute == NULL)
			ExecPartitionCheckEmitError(resultRelInfo, slot, estate);
		
		/* 删除元组,但不进行RETURNING,insert操作的时候再返回 */
		ExecDelete(mtstate, tupleid, oldtuple, planSlot, epqstate,
				   estate, false, false /* canSetTag */ ,
				   true /* changingPart */ , &tuple_deleted, &epqslot);
		
		/* 若因为各种原因没能成功删除 */
		if (!tuple_deleted)
		{
			/* epqslot通常为空,但是若删除时发现另一个事务同时更新了同一行,那么重新检查 */
			if (TupIsNull(epqslot))
				return NULL;
			else
			{
				slot = ExecFilterJunk(resultRelInfo->ri_junkFilter, epqslot);
				goto lreplace;
			}
		}
		
		/* 保存update操作的转换捕获映射,因为在插入之后需要恢复回来 */
		if (mtstate->mt_transition_capture)
			saved_tcs_map = mtstate->mt_transition_capture->tcs_map;
		
		/* 获取resultRel在mtstate->resultRelInfo中的位置,执行元组转换 */
		map_index = resultRelInfo - mtstate->resultRelInfo;
		Assert(map_index >= 0 && map_index < mtstate->mt_nplans);
		tupconv_map = tupconv_map_for_subplan(mtstate, map_index);
		if (tupconv_map != NULL)
			slot = execute_attr_map_slot(tupconv_map->attrMap,
							 slot,
							 mtstate->mt_root_tuple_slot);
		
		/* 准备元组路由 */
		Assert(mtstate->rootResultRelInfo != NULL);
		slot = ExecPrepareTupleRouting(mtstate, estate, proute,
						   mtstate->rootResultRelInfo, slot);
		
		/* 执行插入 */
		ret_slot = ExecInsert(mtstate, slot, planSlot,
								  estate, canSetTag);
		
		/* 还原一些临时更改 */
		estate->es_result_relation_info = resultRelInfo;
		if (mtstate->mt_transition_capture)
		{
			mtstate->mt_transition_capture->tcs_original_insert_tuple = NULL;
			mtstate->mt_transition_capture->tcs_map = saved_tcs_map;
		}
		
		/* 向其它分区插入的update操作完成 */
		return ret_slot;
	}
	
	/* 检查元组分区约束以外的其它约束条件 */
	if (resultRelationDesc->rd_att->constr)
		ExecConstraints(resultRelInfo, slot, estate);
	
	/* 执行update操作 */
	result = table_tuple_update(resultRelationDesc, tupleid, slot,
						estate->es_output_cid,
						estate->es_snapshot,
						estate->es_crosscheck_snapshot,
						true /* wait for commit */ ,
						&tmfd, &lockmode, &update_indexes);
	
	switch (result)
	{
		/* 目标元组已经被当前命令或当前事务的后续命令更新或删除了 */
		case TM_SelfModified:
			if (tmfd.cmax != estate->es_output_cid)
				ereport(ERROR,
						(errcode(ERRCODE_TRIGGERED_DATA_CHANGE_VIOLATION),
						 errmsg("tuple to be updated was already modified by an operation triggered by the current command"),
						 errhint("Consider using an AFTER trigger instead of a BEFORE trigger to propagate changes to other rows.")));

			/* Else, already updated by self; nothing to do */
			return NULL;
		
		/* 正常更新 */
		case TM_Ok:
			break;
			
		/* 目标元组已经被其它事务更新 */
		case TM_Updated:
			{
				/* 检查事务隔离级别是否大于等于可重复读 */
				if (IsolationUsesXactSnapshot())
					ereport(ERROR,
							(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
							 errmsg("could not serialize access due to concurrent update")));
				
				/* 获取或创建EPQ测试要用的slot,然后锁定最新版本的元组,并将其放入inputslot */
				inputslot = EvalPlanQualSlot(epqstate, resultRelationDesc,
									 resultRelInfo->ri_RangeTableIndex);

				result = table_tuple_lock(resultRelationDesc, tupleid,
								  estate->es_snapshot,
								  inputslot, estate->es_output_cid,
								  lockmode, LockWaitBlock,
								  TUPLE_LOCK_FLAG_FIND_LAST_VERSION,
								  &tmfd);
				
				switch (result)
				{
					case TM_Ok:
						Assert(tmfd.traversed);
						
						/* 判断是否应该在读已提交的规则下处理已更新的元组,如果为否的话,返回NULL */
						epqslot = EvalPlanQual(epqstate,
									   resultRelationDesc,
									   resultRelInfo->ri_RangeTableIndex,
									   inputslot);
						if (TupIsNull(epqslot))
							return NULL;
						
						slot = ExecFilterJunk(resultRelInfo->ri_junkFilter, epqslot);
							goto lreplace;
						
					/* 元组已被删除 */
					case TM_Deleted:
						return NULL;
						
					/* 若已经被自身这条命令更新,则跳过更新操作,否则报错 */
					case TM_SelfModified:
						if (tmfd.cmax != estate->es_output_cid)
							ereport(ERROR,
								(errcode(ERRCODE_TRIGGERED_DATA_CHANGE_VIOLATION),
								 errmsg("tuple to be updated was already modified by an operation triggered by the current command"),
								 errhint("Consider using an AFTER trigger instead of a BEFORE trigger to propagate changes to other rows.")));
						return NULL;
						
					/* 其它出错情况 */
					default:
						elog(ERROR, "unexpected table_tuple_lock status: %u",
							 result);
						return NULL;
				}
			}
			
			break;
			
		/* 目标元组已被其它事务删除 */
		case TM_Deleted:
			if (IsolationUsesXactSnapshot())
				ereport(ERROR,
						(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
						 errmsg("could not serialize access due to concurrent delete")));
			return NULL;
			
		/* 其它出错情况 */
		default:
			elog(ERROR, "unrecognized table_tuple_update status: %u",
					 result);
			return NULL;
	}
	
	/* 有需要的话,为元组插入索引 */
	if (resultRelInfo->ri_NumIndices > 0 && update_indexes)
		recheckIndexes = ExecInsertIndexTuples(slot, estate, false, NULL, NIL);
}

/* 有需要的话,统计处理的元组数目 */
if (canSetTag)
	(estate->es_processed)++;

/* AFTER ROW UPDATE 触发器 */
ExecARUpdateTriggers(estate, resultRelInfo, tupleid, oldtuple, slot,
				 recheckIndexes,
				 mtstate->operation == CMD_INSERT ?
				 mtstate->mt_oc_transition_capture :
				 mtstate->mt_transition_capture);

list_free(recheckIndexes);

/* 检查父视图中的所有WITH CHECK OPTION约束 */
if (resultRelInfo->ri_WithCheckOptions != NIL)
	ExecWithCheckOptions(WCO_VIEW_CHECK, resultRelInfo, slot, estate);

/* 处理RETURNING */
if (resultRelInfo->ri_projectReturning)
	return ExecProcessReturning(resultRelInfo, slot, planSlot);

return NULL;

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
转载请注明出处: https://daima100.com/7884.html

(0)
上一篇 2023-03-12
下一篇 2023-03-12

相关推荐

发表回复

您的电子邮箱地址不会被公开。 必填项已用*标注