Pipelined table functions are table functions that return or "pipe" rows back to the calling query as the function produces the data in the desired form - and before the function has completed all processing!
Before diving into the implementation and application of pipelined table functions, it is important to understand how unusual the above statement is. PL/SQL is not a multi-threaded language. Generally, when an PL/SQL block (anonymous, nested, procedure function, etc.) is invoked, further processing in that session is "on hold" (suspended) until the block returns control to the host that invoked the block - whether it be another PL/SQL block, a SQL statement, or a host language, such as Java.
Normal (non-pipelined) table functions act in precisely this way. Each time the table function is invoked (either once or with left correlations for each row of the table on the left in the FROM clause), SQL engine must wait until a RETURN statement is executed to pass back the collection to the SELECT for conversion into rows and columns.
This blocking behavior can have a negative impact on overall performance of the SELECT statement, especially in ETL (extract-transform-load) operations of a data warehouse. In addition, with each element added to the collection in the table function, more and more Process Global Area (PGA or session) memory is consumed. For very large datasets, this can lead to further performance degradation and even errors.
Pipelined table functions get around both these problems. Let's take a look!
Let's start our exploration of pipelined table functions (which I will also refer to as PTFs in this tutorial) with about as simple an example as you can get. Here are explanations for the steps taken in the code section:
CREATE OR REPLACE TYPE strings_t IS TABLE OF VARCHAR2 (100);
/
CREATE OR REPLACE FUNCTION strings
RETURN strings_t PIPELINED
AUTHID DEFINER
IS
BEGIN
PIPE ROW ('abc');
RETURN;
END;
/
SELECT COLUMN_VALUE my_string FROM TABLE (strings())
/
/* And in 12.2 and higher, no need for TABLE */
SELECT COLUMN_VALUE my_string FROM strings()
/
What about executing this function in PL/SQL?
DECLARE
l_strings strings_t := strings_t();
BEGIN
l_strings := strings ();
END;
/
You will see this error:
PLS-00653: aggregate/table functions are not allowed in PL/SQL scope
This makes a lot of sense. PL/SQL is not a multi-threaded language. It cannot accept rows that are "piped" back before the function terminates execution.
To be very clear: non-pipelined table functions can be invoked natively in PL/SQL, since they follow the standard model in PL/SQL: execute all code until hitting the RETURN statement or an exception is raised. But when you go with PIPELINED, you give up the ability to call the function in PL/SQL.
Of course, in this incredibly basic pipelined table function, there will be no issue of blocking or memory consumption. But it gets across the basic elements of a pipelined table function:
Pipelined table functions can help improve performance over a non-pipelined table function, and also reduce PGA memory consumption. Let's take a closer look.
Run the code below to create the database objects needed to culminate in a non-pipelined table function performing the stocks-to-tickers transformation covered in Module 3 (Streaming Table Functions).DROP TYPE ticker_ot FORCE;
DROP TYPE tickers_nt FORCE;
DROP TYPE stock_ot FORCE;
DROP TYPE stocks_nt FORCE;
DROP TABLE stocks;
DROP TABLE tickers;
CREATE TABLE stocks
(
ticker VARCHAR2 (20),
trade_date DATE,
open_price NUMBER,
close_price NUMBER
)
/
/* Load up 10000 rows - when running in your own database, you might want to
use a higher volume of data here, to see a more dramatic difference in the
elapsed time and memory utilization
*/
INSERT INTO stocks
SELECT 'STK' || LEVEL,
SYSDATE,
LEVEL,
LEVEL + 15
FROM DUAL
CONNECT BY LEVEL <= 10000
/
CREATE TABLE tickers
(
ticker VARCHAR2 (20),
pricedate DATE,
pricetype VARCHAR2 (1),
price NUMBER
)
/
CREATE TYPE ticker_ot AS OBJECT
(
ticker VARCHAR2 (20),
pricedate DATE,
pricetype VARCHAR2 (1),
price NUMBER
);
/
CREATE TYPE tickers_nt AS TABLE OF ticker_ot;
/
CREATE OR REPLACE PACKAGE stock_mgr
AUTHID DEFINER
IS
TYPE stocks_rc IS REF CURSOR RETURN stocks%ROWTYPE;
END stock_mgr;
/
CREATE OR REPLACE FUNCTION doubled_nopl (rows_in stock_mgr.stocks_rc)
RETURN tickers_nt
AUTHID DEFINER
IS
TYPE stocks_aat IS TABLE OF stocks%ROWTYPE INDEX BY PLS_INTEGER;
l_stocks stocks_aat;
l_doubled tickers_nt := tickers_nt ();
BEGIN
LOOP
FETCH rows_in BULK COLLECT INTO l_stocks LIMIT 100;
EXIT WHEN l_stocks.COUNT = 0;
FOR l_row IN 1 .. l_stocks.COUNT
LOOP
l_doubled.EXTEND;
l_doubled (l_doubled.LAST) :=
ticker_ot (l_stocks (l_row).ticker,
l_stocks (l_row).trade_date,
'O',
l_stocks (l_row).open_price);
l_doubled.EXTEND;
l_doubled (l_doubled.LAST) :=
ticker_ot (l_stocks (l_row).ticker,
l_stocks (l_row).trade_date,
'C',
l_stocks (l_row).close_price);
END LOOP;
END LOOP;
RETURN l_doubled;
END;
/
Now let's created a pipelined version of the above function. Line numbers referenced in explanations below are visible after you Insert Into Editor.
Note that you do not need to explicilty close the cursor variable; Oracle will automatically close a cursor variable created with the CURSOR expression when the table function terminates.
CREATE OR REPLACE FUNCTION doubled_pl (rows_in stock_mgr.stocks_rc)
RETURN tickers_nt
PIPELINED
AUTHID DEFINER
IS
TYPE stocks_aat IS TABLE OF stocks%ROWTYPE INDEX BY PLS_INTEGER;
l_stocks stocks_aat;
BEGIN
LOOP
FETCH rows_in BULK COLLECT INTO l_stocks LIMIT 100;
EXIT WHEN l_stocks.COUNT = 0;
FOR l_row IN 1 .. l_stocks.COUNT
LOOP
PIPE ROW (ticker_ot (l_stocks (l_row).ticker,
l_stocks (l_row).trade_date,
'O',
l_stocks (l_row).open_price));
PIPE ROW (ticker_ot (l_stocks (l_row).ticker,
l_stocks (l_row).trade_date,
'C',
l_stocks (l_row).close_price));
END LOOP;
END LOOP;
RETURN;
END;
/
Before exploring the impact on performance and memory, let's verify that this pipelined table function can be used in a SELECT just like the non-pipelined version.
SELECT COUNT(*) FROM tickers
/
INSERT INTO tickers
SELECT *
FROM TABLE (doubled_pl (CURSOR (SELECT * FROM stocks)))
/
SELECT * FROM tickers
WHERE ROWNUM < 20
/
So now let's prove to you that the SQL engine is able to take those piped rows and put them immediately to work! The line numbers referenced in the explanations below are visible after you load the code into the editor.
CREATE OR REPLACE PACKAGE utils
IS
PROCEDURE initialize (context_in IN VARCHAR2);
PROCEDURE show_results (message_in IN VARCHAR2 := NULL);
END;
/
CREATE OR REPLACE PACKAGE BODY utils
IS
last_timing INTEGER := NULL;
last_pga INTEGER := NULL;
FUNCTION pga_consumed
RETURN NUMBER
AS
l_pga NUMBER;
BEGIN
SELECT st.VALUE
INTO l_pga
FROM v$mystat st, v$statname sn
WHERE st.statistic# = sn.statistic# AND sn.name = 'session pga memory';
RETURN l_pga;
END;
PROCEDURE initialize (context_in IN VARCHAR2)
IS
BEGIN
DELETE FROM tickers;
COMMIT;
DBMS_OUTPUT.put_line (context_in);
last_timing := DBMS_UTILITY.get_time;
last_pga := pga_consumed;
END;
PROCEDURE show_results (message_in IN VARCHAR2 := NULL)
IS
l_count INTEGER;
BEGIN
SELECT COUNT (*) INTO l_count FROM tickers;
DBMS_OUTPUT.put_line ('Ticker row count: ' || l_count);
DBMS_OUTPUT.put_line (
'"' || message_in || '" completed in: ' ||
TO_CHAR (DBMS_UTILITY.get_time - last_timing)||' centisecs; pga at: '||
TO_CHAR (pga_consumed() - last_pga) || ' bytes');
END;
END;
/
BEGIN
utils.initialize ('Pipelined');
INSERT INTO tickers
SELECT *
FROM TABLE (doubled_pl (CURSOR (SELECT * FROM stocks)))
WHERE ROWNUM < 10;
utils.show_results ('First 9 rows');
utils.initialize ('Not Pipelined');
INSERT INTO tickers
SELECT *
FROM TABLE (doubled_nopl (CURSOR (SELECT * FROM stocks)))
WHERE ROWNUM < 10;
utils.show_results ('First 9 rows');
END;
/
The results shown below are for an initial load of 200,000 rows into the stocks table. That volume of data is not supported in LiveSQL sessions, so the results you see here will be less dramatic, but the pattern is the same.
The significantly faster response time with the pipelined function demonstrates clearly that the INSERT-SELECT statement was able to keep track of rows returned by the function. As soon as nine rows were passed back, the SQL engine terminated execution of the pipelined table function and inserted the rows.
With the non-pipelined version, we have to wait for 10,000 rows to be doubled into 20,000 rows (consuming lots of Process Global Area memory, as well). Then all those rows are passed back to the SELECT statement, at which point the SQL engine says "Well, I just wanted the first 9." and throws away the rest.
Very inefficient.
From a memory standpoint, the non-pipelined table function consumes much more PGA memory then the pipelined version.
Pipelined "First 9 rows" completed in: 8 centisecs; pga at: 528345 bytes Ticker row count: 9 Not Pipelined "First 9 rows" completed in: 93 centisecs; pga at: 96206848 bytes Ticker row count: 9
Sometimes (as in the performance and memory test in the previous section) you will want to terminate the pipelined table function before all rows have been piped back. Oracle will then raise the NO_DATA_NEEDED exception. This will terminate the function, but will not terminate the SELECT statement that called it. You do need to explicitly handle this exception if either of the following applies:
Let's explore this behavior in more detail. In this first section, I only use 1 row, so Oracle raises NO_DATA_NEEDED, but no exception is raised.
CREATE OR REPLACE TYPE strings_t IS TABLE OF VARCHAR2 (100);
/
CREATE OR REPLACE FUNCTION strings
RETURN strings_t
PIPELINED
AUTHID DEFINER
IS
BEGIN
PIPE ROW (1);
PIPE ROW (2);
RETURN;
END;
/
SELECT COLUMN_VALUE my_string
FROM TABLE (strings ())
WHERE ROWNUM < 2
/
Now I add an OTHERS exception handler and nothing else.
CREATE OR REPLACE FUNCTION strings
RETURN strings_t
PIPELINED
AUTHID DEFINER
IS
BEGIN
PIPE ROW (1);
PIPE ROW (2);
RETURN;
EXCEPTION
WHEN OTHERS
THEN
DBMS_OUTPUT.put_line ('Error: ' || SQLERRM);
RAISE;
END;
/
SELECT COLUMN_VALUE my_string
FROM TABLE (strings ())
WHERE ROWNUM < 2
/
BEGIN
DBMS_OUTPUT.put_line ('Flush output cache!');
END;
/
As you can see, the NO_DATA_NEEDED error is trapped by that handler, and the re-raise does not manifest as an error in the SELECT statement. The problem with taking this approach, though, is that your OTHERS handler might contain specific cleanup code that makes sense for unexpected failures, but not for an early termination of data piping. So the recommendation is to provide a specific handler for NO_DATA_NEEDED.
CREATE OR REPLACE FUNCTION strings
RETURN strings_t
PIPELINED
AUTHID DEFINER
IS
BEGIN
PIPE ROW (1);
PIPE ROW (2);
RETURN;
EXCEPTION
WHEN no_data_needed
THEN
RAISE;
WHEN OTHERS
THEN
/* Clean up code here! */
RAISE;
END;
/
SELECT COLUMN_VALUE my_string
FROM TABLE (strings ())
WHERE ROWNUM < 2
/
In the code below, I demonstrate that both NO_DATA_FOUND and NO_DATA_NEEDED are by default "hidden" from the calling query, but other exceptions like PROGRAM_ERROR result in termination of the SQL statement.
CREATE OR REPLACE FUNCTION strings (err_in IN VARCHAR2)
RETURN strings_t
PIPELINED
AUTHID DEFINER
IS
BEGIN
PIPE ROW (1);
CASE err_in
WHEN 'no_data_found'
THEN
RAISE NO_DATA_FOUND;
WHEN 'no_data_needed'
THEN
RAISE no_data_needed;
WHEN 'program_error'
THEN
RAISE PROGRAM_ERROR;
END CASE;
RETURN;
END;
/
SELECT COLUMN_VALUE my_string FROM TABLE (strings ('no_data_found'))
/
SELECT COLUMN_VALUE my_string FROM TABLE (strings ('no_data_needed'))
/
SELECT COLUMN_VALUE my_string FROM TABLE (strings ('program_error'))
/
The basic takeaway regarding NO_DATA_NEEDED is: don't worry about it, unless you are providing a WHEN OTHERS handler in your pipelined table function. In that case, make sure to provide a handler for NO_DATA_NEEDED, in which you will simply re-raise the exception with a RAISE; statement.
CREATE TABLE stocks2
(
ticker VARCHAR2 (20),
trade_date DATE,
open_price NUMBER,
close_price NUMBER
)
/
CREATE OR REPLACE PACKAGE pkg
AUTHID DEFINER
AS
TYPE stocks_nt IS TABLE OF stocks2%ROWTYPE;
FUNCTION stock_rows
RETURN stocks_nt
PIPELINED;
END;
/
CREATE OR REPLACE PACKAGE BODY pkg
AS
FUNCTION stock_rows
RETURN stocks_nt
PIPELINED
IS
l_stock stocks2%ROWTYPE;
BEGIN
l_stock.ticker := 'ORCL';
l_stock.open_price := 100;
PIPE ROW (l_stock);
RETURN;
END;
END;
/
SELECT ticker, open_price FROM TABLE (pkg.stock_rows ())
/
This is more convenient, but behind the scenes, Oracle Database is creating types implicitly for you, as can see below.
SELECT object_name, object_type
FROM user_objects
WHERE object_type IN ('TYPE', 'PACKAGE', 'PACKAGE BODY')
/
Pipelined table functions are something of an oddity in PL/SQL: they pass data back to the calling query, even before the function is completed; they don't pass back anything but control with the RETURN statement; you cannot call a PTF from within PL/SQL itself, only in the FROM clause of a SELECT.
But those oddities reflect the power of this special type of function: improved performance and reduced memory consumption, over normal (non-pipelined) table functions.
This tutorial covers the fundamental features and behavior of pipelined table functions. You may also want to explore parallel-enabling pipelined table functions. Follow the links at the bottom of this tutorial for more details on this topic.