Postgresql源码分析returns setof函数oracle管道pipelined

2023-02-01 09:18:46
目录
引言【功能】【代码】【实用函数】1 :管道函数是什么,应用于什么场景2 return next实现具体处理函数:exec_stmt_return_next1 初始化tuple store场景一:return next返回var类型场景二:return next返回record类型场景三:return next返回row类型3 用例

引言

【功能】

    Oracle的return>Postgresql的return setof函数并不能起到降低内存使用的效果,return next 单条数据只起到了缓存的效果,并不会把数据逐条返回SQL层处理,没有降低内存的效果。

    【代码】

      exec_stmt_return_next中的tupledesc从执行计划node中取出,返回值需要满足desc要求,缓存值也会按该desc保存。return>rec类型本质上就是tuple,数据和desc都以扩展形式存放在erh中。如果需要转换为tuple,有几个标准函数提供转换功能,且支持类型转换。【转换后调用tuplestore的标准接口缓存tuple】row类型本质上是一个虚拟行(由一组datum位置组成),row->varnos[i]指向某一个datum,如果想把row转换为tuple,需要用exec_eval_datum算出varnos指向的datum的值,然后组装成values和nulls数组,用heap_form_tuple构造。注意这种转换过程不会有类型转换,如果需要的desc和算出来的列类型对不上,返回空。成功【转换后调用tuplestore的标准接口缓存tuple】return next对var类型的处理:var看做单列tuple,按执行计划给的desc转换类型后构造tuple。【转换后调用tuplestore的标准接口缓存tuple】

      【实用函数】

        通用
          类型转换:exec_cast_value(传入的值不能是eoh真实的头,使用前需要转成eoh存的1be头,1be指向真实头)数组拼接minimaltuple:heap_form_minimal_tuple有一个tuple和desc转换为另一个desc的tuple:convert_tuples_by_position、execute_attr_map_tupletuplestore:
            用values数组存tuple(用tuplestore_puttuple_common拼好后传tuple):tuplestore_putvalues用HeapTuple存tuple(直接传tuple):tuplestore_puttuple类型
              根据类型id和mod找出desc:lookup_rowtype_tupdescerh
                从erh扩展类型拿到紧凑tuple:expanded_record_get_tuple

                1>

                oracle支持pipelined函数,可以在函数定义时指定RETURN 集合类型 PIPELINED  来说明当前函数是管道函数。

                管道函数最大的作用就是可以使一次返回的集合类型,变为 逐条返回,大大减少内存的使用。

                例如:嵌套表类型outrecset是函数f_trans的返回值,普通函数只能组装好嵌套表outrecset(全部缓存在内存),一次性返回。如果嵌套表内容较多,可能会占用较大的内存空间。

                如果使用管道函数,可以通过pipe row(嵌套表中的一行)来代替return语句,函数把嵌套表逐行返回给上层处理,无需缓存,降低内存使用。

                ORACLE实例:

                CREATE OR REPLACE PACKAGE refcur_pkg AUTHID DEFINER IS
                  TYPE refcur_t IS REF CURSOR RETURN employees%ROWTYPE;
                  TYPE outrec_typ IS RECORD (
                    var_num    NUMBER(6),
                    var_char1  VARCHAR2(30),
                    var_char2  VARCHAR2(30)
                  );
                  TYPE outrecset IS TABLE OF outrec_typ;
                  FUNCTION f_trans (p refcur_t) RETURN outrecset PIPELINED;
                END refcur_pkg;
                /
                CREATE OR REPLACE PACKAGE BODY refcur_pkg IS
                  FUNCTION f_trans (p refcur_t) RETURN outrecset PIPELINED IS
                    out_rec outrec_typ;
                    in_rec  p%ROWTYPE;
                  BEGIN
                    LOOP
                      FETCH p INTO in_rec;  -- input row
                      EXIT WHEN p%NOTFOUND;
                      out_rec.var_num := in_rec.employee_id;
                      out_rec.var_char1 := in_rec.first_name;
                      out_rec.var_char2 := in_rec.last_name;
                      PIPE ROW(out_rec);  -- first transformed output row
                      out_rec.var_char1 := in_rec.email;
                      out_rec.var_char2 := in_rec.phone_number;
                      PIPE ROW(out_rec);  -- second transformed output row
                    END LOOP;
                    CLOSE p;
                    RETURN;
                  END f_trans;
                END refcur_pkg;
                /
                SELECT * FROM TABLE (
                  refcur_pkg.f_trans (
                    CURSOR (SELECT * FROM employees WHERE department_id = 60)
                  )
                );

                在PG中,普通的return语句也是需要一次性返回数据,但PG应该是参考ORACLE实现了return next的功能,也希望逐条返回数据(PG没有集合类型,已普通类型为例):

                drop function f1;
                create or replace function f1(in i int, out j int) returns setof int as $$
                begin
                  j := i+1;
                  return next;
                  j := i+2;
                  return next;
                  return;
                end$$ language plpgsql;
                select * from f1(42);
                 j  
                ----
                 43
                 44

                但在内核实现中,并不是逐条返回的,return next其实只起到了缓存数据的功能,总的数据集也是一次性返回SQL层的,和直接return没有区别(只有语法上的区别)。

                所以PG的return setof函数并不能起到降低内存使用的效果。下面来分析具体过程。

                2>

                return next目前支持三类数据的返回,var、rec、rows return next也可以不加参数,返回值按out参数列表拼接

                具体处理函数:exec_stmt_return_next

                static int
                exec_stmt_return_next(PLpgSQL_execstate *estate,
                					  PLpgSQL_stmt_return_next *stmt)
                {
                	TupleDesc	tupdesc;
                	int			natts;
                	HeapTuple	tuple;
                	MemoryContext oldcontext;

                1>

                初始化总结:

                1 初始化的过程就是在构造Tuplestorestate,主要动作:

                  给Tuplestorestate新的内存上下文ExecutorState记录不能随机访问:eflags = EXEC_FLAG_REWIND记录三个操作函数:copytup_heap、writetup_heap、readtup_heap

                  2 给estate->tuple_store_desc添加desc,desc来源:

                    从执行计划节点中node(Tuplestorestate)拿到后,传入ExecMakeTableFunctionResultExecMakeTableFunctionResult组装ReturnSetInfo挂到fcinfo->resultinfo上plpgsql_exec_function时从fcinfo中拿出ReturnSetInfo取到descplpgsql_estate_setup将取到的desc存入estate->rsi = rsi
                    #0  plpgsql_estate_setup (estate=0x7ffd81e2f850, func=0x2419028, rsi=0x7ffd81e2fb20, simple_eval_estate=0x0, simple_eval_resowner=0x0) at pl_exec.c:3972
                    #1  0x00007fe0a3992064 in plpgsql_exec_function (func=0x2419028, fcinfo=0x24da5a8, simple_eval_estate=0x0, simple_eval_resowner=0x0, procedure_resowner=0x0, atomic=true) at pl_exec.c:485
                    #2  0x00007fe0a39ac8f9 in plpgsql_call_handler (fcinfo=0x24da5a8) at pl_handler.c:277
                    #3  0x0000000000738829 in ExecMakeTableFunctionResult (setexpr=0x24e0b40, econtext=0x24e0a10, argContext=0x24da490, expectedDesc=0x24e1110, randomAccess=false) at execSRF.c:235
                    #4  0x0000000000753eed in FunctionNext (node=0x24e0800) at nodeFunctionscan.c:95
                    #5  0x000000000073a081 in ExecScanFetch (node=0x24e0800, accessMtd=0x753e3b <FunctionNext>, recheckMtd=0x754242 <FunctionRecheck>) at execScan.c:133
                    #6  0x000000000073a0f6 in ExecScan (node=0x24e0800, accessMtd=0x753e3b <FunctionNext>, recheckMtd=0x754242 <FunctionRecheck>) at execScan.c:182
                    #7  0x000000000075428c in ExecFunctionScan (pstate=0x24e0800) at nodeFunctionscan.c:270
                    #8  0x000000000073614e in ExecProcNodeFirst (node=0x24e0800) at execProcnode.c:464
                    #9  0x000000000072a08a in ExecProcNode (node=0x24e0800) at ../../../src/include/executor/executor.h:262
                    #10 0x000000000072cb80 in ExecutePlan (estate=0x24e05d8, planstate=0x24e0800, use_parallel_mode=false, operation=CMD_SELECT, sendTuples=true, numberTuples=0, direction=ForwardScanDirection, dest=0x24d5910, execute_once=true) at execMain.c:1632
                    #11 0x000000000072a6d1 in standard_ExecutorRun (queryDesc=0x23f1248, direction=ForwardScanDirection, count=0, execute_once=true) at execMain.c:364
                    #12 0x000000000072a50b in ExecutorRun (queryDesc=0x23f1248, direction=ForwardScanDirection, count=0, execute_once=true) at execMain.c:308
                    #13 0x0000000000997ba9 in PortalRunSelect (portal=0x2474a28, forward=true, count=0, dest=0x24d5910) at pquery.c:924
                    #14 0x0000000000997867 in PortalRun (portal=0x2474a28, count=9223372036854775807, isTopLevel=true, run_once=true, dest=0x24d5910, altdest=0x24d5910, qc=0x7ffd81e300b0) at pquery.c:768
                    #15 0x0000000000991408 in exec_simple_query (query_string=0x23c9518 "select * from f1(42);") at postgres.c:1238
                    #16 0x0000000000995a3e in PostgresMain (dbname=0x2400998 "postgres", username=0x23c5178 "mingjie") at postgres.c:4563
                    #17 0x00000000008d3cfe in BackendRun (port=0x23f7220) at postmaster.c:4396
                    #18 0x00000000008d3697 in BackendStartup (port=0x23f7220) at postmaster.c:4124
                    #19 0x00000000008d00b8 in ServerLoop () at postmaster.c:1791
                    #20 0x00000000008cf98a in PostmasterMain (argc=1, argv=0x23c3120) at postmaster.c:1463
                    #21 0x00000000007ada4b in main (argc=1, argv=0x23c3120) at main.c:200

                    分析:

                    	if (estate->tuple_store == NULL)
                    		exec_init_tuple_store(estate);
                    	tupdesc = estate->tuple_store_desc;
                    	natts = tupdesc->natts;
                    	if (stmt->retvarno >= 0)
                    	{
                    		PLpgSQL_datum *retvar = estate->datums[stmt->retvarno];
                    		switch (retvar->dtype)
                    		{

                    初始化函数exec_init_tuple_store

                    static void
                    exec_init_tuple_store(PLpgSQL_execstate *estate)
                    {
                    	ReturnSetInfo *rsi = estate->rsi;
                    	MemoryContext oldcxt;
                    	ResourceOwner oldowner;
                    // 从"SPI Proc"切换到"ExecutorState"
                    	oldcxt = MemoryContextSwitchTo(estate->tuple_store_cxt);
                    // 从“Portal”切换到"Portal"	
                    	oldowner = CurrentResourceOwner;
                    	CurrentResourceOwner = estate->tuple_store_owner;
                    // 进入tuplestore_begin_heap函数
                    	estate->tuple_store =
                    		tuplestore_begin_heap(rsi->allowedModes & SFRM_Materialize_Random, false, work_mem);
                    	CurrentResourceOwner = oldowner;
                    	MemoryContextSwitchTo(oldcxt);
                    // 给estate添加DESC,rsi->expectedDesc的来源?
                    	estate->tuple_store_desc = rsi->expectedDesc;
                    }

                    进入tuplestore_begin_heap

                    Tuplestorestate *
                    tuplestore_begin_heap(bool randomAccess, bool interXact, int maxKBytes)
                    {
                    // 输入false不允许随机访问、false、8192
                    	Tuplestorestate *state;
                    	int			eflags;
                    // eflags = EXEC_FLAG_REWIND
                    	eflags = randomAccess ?
                    		(EXEC_FLAG_BACKWARD | EXEC_FLAG_REWIND) :
                    		(EXEC_FLAG_REWIND);
                    // 进入tuple store模块开始初始化返回Tuplestorestate,注意他会直接拿当前的memcontext
                    	state = tuplestore_begin_common(eflags, interXact, maxKBytes);
                    // 返回的Tuplestorestate状态:
                    // state = {status = TSS_INMEM, eflags = 2, backward = false, interXact = false, 
                    //          truncated = false, availMem = 8372200, allowedMem = 8388608, tuples = 0,
                    //          myfile = 0x0, context = "ExecutorState", resowner = "Portal", copytup = 0x0, 
                    //          writetup = 0x0, readtup = 0x0, memtuples = 0x24f0d88, memtupdeleted = 0,
                    //          memtupcount = 0, memtupsize = 2048, growmemtuples = true, readptrs = 0x24e7a70, 
                    //          activeptr = 0, readptrcount = 1, readptrsize = 8, writepos_file = 0,writepos_offset = 0}
                    	state->copytup = copytup_heap;
                    	state->writetup = writetup_heap;
                    	state->readtup = readtup_heap;
                    	return state;
                    }

                    后面根据返回值的不同,进入几个分支。

                    在进入前,desc已经获取到了: tupdesc = estate->tuple_store_desc; natts = tupdesc->natts;

                    场景一:return>
                    			case PLPGSQL_DTYPE_VAR:
                    				{
                    					PLpgSQL_var *var = (PLpgSQL_var *) retvar;
                    					Datum		retval = var->value;
                    					bool		isNull = var->isnull;
                    					Form_pg_attribute attr = TupleDescAttr(tupdesc, 0);
                    					if (natts != 1)
                    						ereport(ERROR,
                    								(errcode(ERRCODE_DATATYPE_MISMATCH),
                    								 errmsg("wrong result type supplied in RETURN NEXT")));
                    					// retval是一个eoh的头,后续处理需要一个1be的头(1be的data部分指向eoh)
                    					retval = MakeExpandedObjectReadOnly(retval, isNull, var->datatype->typlen);
                    					// 转成需要的类型
                    					retval = exec_cast_value(estate,
                    											 retval,
                    											 &isNull,
                    											 var->datatype->typoid,
                    											 var->datatype->atttypmod,
                    											 attr->atttypid,
                    											 attr->atttypmod);
                    					tuplestore_putvalues(estate->tuple_store, tupdesc, &retval, &isNull);
                    				}
                    				break;

                    执行tuplestore_putvalues保存元组

                    void
                    tuplestore_putvalues(Tuplestorestate *state, TupleDesc tdesc,
                    					 Datum *values, bool *isnull)
                    {
                    	MinimalTuple tuple;
                    	MemoryContext oldcxt = MemoryContextSwitchTo(state->context);
                    	tuple = heap_form_minimal_tuple(tdesc, values, isnull);
                    // 记录使用了多少空间,修改state->availMem
                    	USEMEM(state, GetMemoryChunkSpace(tuple));
                    	tuplestore_puttuple_common(state, (void *) tuple);
                    	MemoryContextSwitchTo(oldcxt);
                    }
                    static void
                    tuplestore_puttuple_common(Tuplestorestate *state, void *tuple)
                    {
                    	TSReadPointer *readptr;
                    	int			i;
                    	ResourceOwner oldowner;
                    	state->tuples++;
                    	switch (state->status)
                    	{

                    内存态直接用数组缓存tuple,tuple使用的内存是在外层函数切换上下文申请的。

                    		case TSS_INMEM:
                    			readptr = state->readptrs;
                    			for (i = 0; i < state->readptrcount; readptr++, i++)
                    			{
                    				if (readptr->eof_reached && i != state->activeptr)
                    				{
                    					readptr->eof_reached = false;
                    					readptr->current = state->memtupcount;
                    				}
                    			}
                    			if (state->memtupcount >= state->memtupsize - 1)
                    			{
                    				(void) grow_memtuples(state);
                    			}
                    			state->memtuples[state->memtupcount++] = tuple;
                    			if (state->memtupcount < state->memtupsize && !LACKMEM(state))
                    				return;
                    			PrepareTempTablespaces();
                    			oldowner = CurrentResourceOwner;
                    			CurrentResourceOwner = state->resowner;
                    			state->myfile = BufFileCreateTemp(state->interXact);
                    			CurrentResourceOwner = oldowner;
                    			state->backward = (state->eflags & EXEC_FLAG_BACKWARD) != 0;
                    			state->status = TSS_WRITEFILE;
                    			dumptuples(state);
                    			break;
                    ...

                    场景二:return>
                    			case PLPGSQL_DTYPE_REC:
                    				{
                    					PLpgSQL_rec *rec = (PLpgSQL_rec *) retvar;
                    					TupleDesc	rec_tupdesc;
                    					TupleConversionMap *tupmap;

                    拿到record:

                    {dtype = PLPGSQL_DTYPE_REC, dno = 1, refname = 0x24db608 "r", lineno = 3, isconst = false, notnull = false, default_val = 0x0, datatype = {typname='foo'}, rectypeid = 17117, firstfield = -1, erh = 0x2509708}

                      数据和desc都在erh中,列名在firstfield指向的位置。数据类型在datatype中:foo数据类型oid在rectypeid中:17117->foo
                      					if (rec->erh == NULL)
                      						instantiate_empty_record_variable(estate, rec);
                      					if (ExpandedRecordIsEmpty(rec->erh))
                      						deconstruct_expanded_record(rec->erh);
                      // "SPI Proc"切到"ExprContext"
                      					oldcontext = MemoryContextSwitchTo(get_eval_mcontext(estate));
                      // return erh->er_tupdesc;
                      					rec_tupdesc = expanded_record_get_tupdesc(rec->erh);
                      // 从保存的desc:rec_tupdesc转换到输出的desc:tupdesc,第一步:生成转换map
                      					tupmap = convert_tuples_by_position(rec_tupdesc,
                      														tupdesc,
                      														gettext_noop("wrong record type supplied in RETURN NEXT"));
                      					tuple = expanded_record_get_tuple(rec->erh);
                      					if (tupmap)
                      // 从保存的desc:rec_tupdesc转换到输出的desc:tupdesc,第二步:用map生成转换后的元组
                      						tuple = execute_attr_map_tuple(tuple, tupmap);
                      // 缓存元组
                      					tuplestore_puttuple(estate->tuple_store, tuple);
                      					MemoryContextSwitchTo(oldcontext);
                      				}
                      				break;

                      场景三:return>

                      必须是两列以上的out参数,直接return next空,才会使用这段逻辑。

                      			case PLPGSQL_DTYPE_ROW:
                      				{
                      					PLpgSQL_row *row = (PLpgSQL_row *) retvar;
                      					oldcontext = MemoryContextSwitchTo(get_eval_mcontext(estate));
                      // 必须严格匹配tupdesc的类型,对不上则转换失败
                      					tuple = make_tuple_from_row(estate, row, tupdesc);
                      					if (tuple == NULL)
                      						ereport(ERROR,...)
                      					tuplestore_puttuple(estate->tuple_store, tuple);
                      					MemoryContextSwitchTo(oldcontext);
                      				}
                      				break;
                      			default:
                      				elog(ERROR, "unrecognized dtype: %d", retvar->dtype);
                      				break;
                      		}
                      	}

                      3>
                      drop function f1;
                      create or replace function f1(in i int, out j int) returns setof int as $$
                      begin
                        j := i+1;
                        return next;
                        j := i+2;
                        return next;
                        return;
                      end$$ language plpgsql;
                      select * from f1(42);
                      ----
                      CREATE TABLE foo (fooid INT, foosubid INT, fooname TEXT);
                      INSERT INTO foo VALUES (1, 2, 'three');
                      INSERT INTO foo VALUES (4, 5, 'six');
                      CREATE OR REPLACE FUNCTION get_all_foo() RETURNS SETOF foo AS
                      $BODY$
                      DECLARE
                          r foo%rowtype;
                      BEGIN
                          FOR r IN
                              SELECT * FROM foo WHERE fooid > 0
                          LOOP
                              -- can do some processing here
                              RETURN NEXT r; -- return current row of SELECT
                          END LOOP;
                          RETURN;
                      END;
                      $BODY$
                      LANGUAGE plpgsql;
                      SELECT * FROM get_all_foo();
                      --------
                      drop function f1(int);
                      create function f1(in i int, out j int, out k text) returns setof record as $$
                      begin
                        j := i+1;
                        k := 'foo';
                        return next;
                        j := j+1;
                        k := 'foot';
                        return next;
                        return;
                      end$$ language plpgsql;
                      select * from f1(42);

                      以上就是Postgresql源码分析returns setof函数oracle管道pipelined的详细内容,更多关于Postgresql returns setof函数的资料请关注易采站长站其它相关文章!

相关文章 大家在看