Hi Tom,
I'm currently working on a project to migrate a custom queue into an Oracle Streams Advanced Queueing queue table and queue. My question is, when using a multiconsumer queue to hold data that is in constant flow (dequeue happens on enqueuing event), how can I limit the number of threads/processes that the dequeue or enqueue function can use at one time?
I believe it's possible, because the documentation here:
http://docs.oracle.com/cd/E11882_01/server.112/e11013/perform.htm Speaks of limiting concurrent processes, but no example is given for how to do so, nor could I find one in the list of AQ (advanced queueing) examples.
If you like, I've devised an example setup of a multiconsumer queue with an event driven dequeue:
Create our user and give him the necessary privileges--Also grant a quota on an existing tablespace
CREATE USER queuetester IDENTIFIED BY queuetest;
GRANT CONNECT, CREATE SESSION TO queuetester;
GRANT SELECT ANY TABLE TO queuetester;
GRANT INSERT ANY TABLE TO queuetester;
GRANT UPDATE ANY TABLE TO queuetester;
GRANT DELETE ANY TABLE TO queuetester;
GRANT aq_administrator_role TO queuetester;
GRANT CREATE TYPE TO queuetester;
GRANT CREATE TABLE TO queuetester;
GRANT CREATE ANY sequence TO queuetester;
GRANT CREATE ANY PROCEDURE TO queuetester;
GRANT EXECUTE ANY PROCEDURE TO queuetester;
GRANT EXECUTE ON dbms_aqadm TO queuetester;
GRANT EXECUTE ON dbms_aq TO queuetester;
Create our payload typecreate or replace type testtyp as object
(
queuelogid NUMERIC
, from_address VARCHAR2(255)
, to_address VARCHAR2(255)
, subject VARCHAR2(255)
, BODYtext CLOB
, create_date DATE
);
Create our queue and queuetable and start the queue, make the queue multiconsumer as wellbegin
DBMS_AQADM.CREATE_QUEUE_TABLE
( queue_table => 'qtabtest',
queue_payload_type => 'testtyp',
multiple_consumers => TRUE );
DBMS_AQADM.CREATE_QUEUE
( queue_name => 'TEST_QUEUE',
queue_table => 'qtabtest');
DBMS_AQADM.START_QUEUE
( queue_name => 'TEST_QUEUE');
end;
Create the sequence and table used to record results of the dequeueCREATE SEQUENCE queuelog START WITH 1 INCREMENT BY 1;
CREATE TABLE test_queue_log(queuelogid NUMERIC PRIMARY KEY, from_address VARCHAR2(255), to_address VARCHAR2(255), subject VARCHAR2(255), bodytext CLOB, mail_create_date DATE, createdate DATE, status VARCHAR2(1));
Create the custom enqueue processCREATE OR REPLACE PROCEDURE enqueuetest(p_from_address IN VARCHAR2,
p_to_address IN VARCHAR2,
p_subject IN VARCHAR2,
p_bodytext IN CLOB,
p_create_date IN DATE DEFAULT SYSDATE) IS
enqueue_options dbms_aq.enqueue_options_t;
message_properties dbms_aq.message_properties_t;
message_handle RAW(16);
CURSOR c_mail_list IS
SELECT testtyp(queuelog.nextval, from_address, to_address, subject, bodytext, create_date) AS testtyp_select
FROM (
SELECT c1.from_address AS from_address, c1.to_address AS to_address, c1.subject as subject, c1.bodytext AS bodytext, c1.create_date AS create_date
FROM (
SELECT p_from_address AS from_address, p_to_address AS to_address, p_subject AS subject, p_bodytext AS bodytext, p_create_date AS create_date
FROM dual
) c1
CROSS JOIN (
SELECT 1 + LEVEL AS num1
FROM dual
CONNECT BY LEVEL <= 1000
) c2
);
BEGIN
FOR mail_rec IN c_mail_list
LOOP
sys.dbms_aq.enqueue(
queue_name => 'TEST_QUEUE',
enqueue_options => enqueue_options,
message_properties => message_properties,
payload => mail_rec.testtyp_select,
msgid => message_handle
);
insert into test_queue_log (
queuelogid, from_address, to_address
, subject, bodytext, mail_create_date
, createdate, status
)
SELECT mail_rec.testtyp_select.queuelogid, mail_rec.testtyp_select.from_address, mail_rec.testtyp_select.to_address
, mail_rec.testtyp_select.subject, mail_rec.testtyp_select.bodytext, mail_rec.testtyp_select.create_date
, SYSDATE, 'U'--"Unsent"
FROM dual;
COMMIT;
END LOOP;
END enqueuetest;
Create the custom dequeue processcreate or replace PROCEDURE notifyCB( context raw,
reginfo sys.aq$_reg_info,
descr sys.aq$_descriptor,
payload raw,
payloadl number)
IS
dequeue_options dbms_aq.dequeue_options_t;
message_properties dbms_aq.message_properties_t;
message_handle RAW(16);
testuser testtyp;
BEGIN
dequeue_options.msgid := descr.msg_id;
dequeue_options.consumer_name := descr.consumer_name;
DBMS_AQ.DEQUEUE(queue_name => descr.queue_name,
dequeue_options => dequeue_options,
message_properties => message_properties,
payload => testuser,
msgid => message_handle);
--Do something here that takes a little time if you want to see the queue dequeue slower
update test_queue_log
set status = 'S'--"Sent"
where queuelogid = testuser.queuelogid;
END;
Create a subscriber for the dequeue procedurebegin
dbms_aqadm.add_subscriber
( queue_name => 'TEST_QUEUE',
subscriber => sys.aq$_agent( 'recipient', null, null ) );
end;
Register the subscriber to the queueBEGIN
dbms_aq.register
( sys.aq$_reg_info_list(
sys.aq$_reg_info('TEST_QUEUE:RECIPIENT',
DBMS_AQ.NAMESPACE_AQ,
'plsql://queuetester.notifyCB',
HEXTORAW('FF')) ) ,
1 );
end;
Sample enqueuingBEGIN
enqueuetest(p_from_address => 'me@work.com',
p_to_address => 'you@work.com',
p_subject => 'Test Email',
p_bodytext => 'Congratulations, the email sent!',
p_create_date => SYSDATE);
END;
Query to monitor remaining records in queueSELECT COUNT(1)
FROM test_queue_log
WHERE status <> 'S';