Skip to Main Content
  • Questions
  • Oracle Advanced Queueing Limiting Concurrent Processes on a Queue

Breadcrumb

Question and Answer

Tom Kyte

Thanks for the question, Stephen.

Asked: April 15, 2014 - 6:36 pm UTC

Last updated: April 18, 2014 - 9:15 pm UTC

Version: 11.2.0

Viewed 10K+ times! This question is

You Asked

Hi Tom,

I'm currently working on a project to migrate a custom queue into an Oracle Streams Advanced Queueing queue table and queue. My question is, when using a multiconsumer queue to hold data that is in constant flow (dequeue happens on enqueuing event), how can I limit the number of threads/processes that the dequeue or enqueue function can use at one time?

I believe it's possible, because the documentation here:

http://docs.oracle.com/cd/E11882_01/server.112/e11013/perform.htm

Speaks of limiting concurrent processes, but no example is given for how to do so, nor could I find one in the list of AQ (advanced queueing) examples.

If you like, I've devised an example setup of a multiconsumer queue with an event driven dequeue:

Create our user and give him the necessary privileges
--Also grant a quota on an existing tablespace
CREATE USER queuetester IDENTIFIED BY queuetest;

GRANT CONNECT, CREATE SESSION TO queuetester;

GRANT SELECT ANY TABLE TO queuetester;
GRANT INSERT ANY TABLE TO queuetester;
GRANT UPDATE ANY TABLE TO queuetester;
GRANT DELETE ANY TABLE TO queuetester;

GRANT aq_administrator_role TO queuetester;
GRANT CREATE TYPE TO queuetester;
GRANT CREATE TABLE TO queuetester;
GRANT CREATE ANY sequence TO queuetester;

GRANT CREATE ANY PROCEDURE TO queuetester;
GRANT EXECUTE ANY PROCEDURE TO queuetester;

GRANT EXECUTE ON dbms_aqadm TO queuetester;
GRANT EXECUTE ON dbms_aq TO queuetester;


Create our payload type
create or replace type testtyp as object
(
 queuelogid NUMERIC
 , from_address VARCHAR2(255)
    , to_address VARCHAR2(255)
    , subject VARCHAR2(255)
    , BODYtext CLOB
    , create_date DATE
);


Create our queue and queuetable and start the queue, make the queue multiconsumer as well
begin
      DBMS_AQADM.CREATE_QUEUE_TABLE
      ( queue_table => 'qtabtest',
        queue_payload_type =>  'testtyp',
        multiple_consumers => TRUE );

      DBMS_AQADM.CREATE_QUEUE
      ( queue_name => 'TEST_QUEUE',
       queue_table => 'qtabtest');

      DBMS_AQADM.START_QUEUE
    (  queue_name => 'TEST_QUEUE');
 end;


Create the sequence and table used to record results of the dequeue
CREATE SEQUENCE queuelog START WITH 1 INCREMENT BY 1;

CREATE TABLE test_queue_log(queuelogid NUMERIC PRIMARY KEY, from_address VARCHAR2(255), to_address VARCHAR2(255), subject VARCHAR2(255), bodytext CLOB, mail_create_date DATE, createdate DATE, status VARCHAR2(1));


Create the custom enqueue process
CREATE OR REPLACE PROCEDURE enqueuetest(p_from_address IN VARCHAR2,
          p_to_address       IN VARCHAR2,
          p_subject IN VARCHAR2,
                                        p_bodytext IN CLOB,
          p_create_date IN DATE DEFAULT SYSDATE) IS
                                        
        enqueue_options dbms_aq.enqueue_options_t;
        message_properties dbms_aq.message_properties_t;
        message_handle RAW(16);

     CURSOR c_mail_list IS
              SELECT testtyp(queuelog.nextval, from_address, to_address, subject, bodytext, create_date) AS testtyp_select
              FROM (
                  SELECT c1.from_address AS from_address, c1.to_address AS to_address, c1.subject as subject, c1.bodytext AS bodytext, c1.create_date AS create_date
                            FROM (
                                           SELECT p_from_address AS from_address, p_to_address AS to_address, p_subject AS subject, p_bodytext AS bodytext, p_create_date AS create_date
                                           FROM dual
                                     ) c1
                           CROSS JOIN (
                                                  SELECT 1 + LEVEL AS num1
                                                  FROM dual
                                                  CONNECT BY LEVEL <= 1000
                                              ) c2
                       );
    
 BEGIN
    
      FOR mail_rec IN c_mail_list
      LOOP

          sys.dbms_aq.enqueue(
                                      queue_name => 'TEST_QUEUE',
                                      enqueue_options => enqueue_options,
                                      message_properties => message_properties,
                                      payload => mail_rec.testtyp_select,
                                      msgid => message_handle
                                  );
                                    
          insert into test_queue_log (
                                                    queuelogid, from_address, to_address
                                                    , subject, bodytext, mail_create_date
                                                    , createdate, status
                                                )
        SELECT mail_rec.testtyp_select.queuelogid, mail_rec.testtyp_select.from_address, mail_rec.testtyp_select.to_address
          , mail_rec.testtyp_select.subject, mail_rec.testtyp_select.bodytext, mail_rec.testtyp_select.create_date
          , SYSDATE, 'U'--"Unsent"
          FROM dual;
          
          COMMIT;
          
      END LOOP;

END enqueuetest;


Create the custom dequeue process
create or replace PROCEDURE notifyCB( context raw,
                                        reginfo sys.aq$_reg_info,
                                        descr sys.aq$_descriptor,
                                        payload raw,
                                        payloadl number)
  IS
   dequeue_options dbms_aq.dequeue_options_t;
   message_properties dbms_aq.message_properties_t;
   message_handle RAW(16);
   
   testuser testtyp;
   
  BEGIN
     dequeue_options.msgid := descr.msg_id;
     dequeue_options.consumer_name := descr.consumer_name;
     
     DBMS_AQ.DEQUEUE(queue_name => descr.queue_name,
                    dequeue_options => dequeue_options,
                    message_properties => message_properties,
                     payload => testuser,
                    msgid => message_handle);

 --Do something here that takes a little time if you want to see the queue dequeue slower

 update test_queue_log
    set status = 'S'--"Sent"
    where queuelogid = testuser.queuelogid;
     
  END;


Create a subscriber for the dequeue procedure
begin
      dbms_aqadm.add_subscriber
     ( queue_name => 'TEST_QUEUE',
        subscriber => sys.aq$_agent( 'recipient', null, null ) );
end;


Register the subscriber to the queue
BEGIN
     dbms_aq.register
     ( sys.aq$_reg_info_list(
          sys.aq$_reg_info('TEST_QUEUE:RECIPIENT',
                           DBMS_AQ.NAMESPACE_AQ,
                          'plsql://queuetester.notifyCB',
                           HEXTORAW('FF')) ) ,
       1 );
end;


Sample enqueuing
BEGIN
 enqueuetest(p_from_address => 'me@work.com',
          p_to_address => 'you@work.com',
          p_subject => 'Test Email',
                                        p_bodytext => 'Congratulations, the email sent!',
          p_create_date => SYSDATE);
END;


Query to monitor remaining records in queue
SELECT COUNT(1) 
FROM test_queue_log 
WHERE status <> 'S';



and Tom said...

I asked about this and got this response from the AQ development manager:

<quote>

AQ PL/SQL Notification doesn't support an API or procedure that would allow its users to control or define
the number of concurrent PL/SQL Notification dequeue processes. The reason being that in AQ messages can
come in bursty fashion on any Database/RAC instance and having a fixed number of processes may not
always work efficiently. Instead, AQ Notification automatically starts appropriate number of AQ Notification background
processes (a.k.a EMON slaves) depending on the total workload and time taken to execute user PL/SQL callbacks.

</quote>


so, in short, a design decision to make the number of plsql processes available at a given point in time is an automatic tuning capability, not something we can influence directly.

However, that said, if you test/benchmark and find the auto-tuning feature to not perform in a fashion that "works" for you - please contact support, reference bug 13744244 so they understand the context quickly...

Rating

  (2 ratings)

Is this answer out of date? If it is, please let us know via a Comment

Comments

Limiting AQ Processes

Kevin Seymour, April 18, 2014 - 2:44 pm UTC

There is a Metalink document that talks about how to single thread reads from a queue/ make sure they are processed in FIFO order that might give you ideas on how to implement this yourself.

AQ/PLSQL Notifications do not maintain FIFO or Priority Ordering (Doc ID 225810.1)

If you have a queue and wanted only 5 processes at a time you could have 5 different locks (LOCK_1, LOCK_2, etc). AQ could start 100 processes, but only 5 would be able to get a lock and actually dequeue messages.
Tom Kyte
April 18, 2014 - 9:15 pm UTC

but if you wanted 10 processes and AQ decided to only start 2, what then?

and why have 100 processes going when you only want five?

utilize support, reference the id from above. There has been some work done on this. It requires setting parameters of the type I'm not willing to discuss.

Interesting information. Is there a rough timeframe?

Arian, September 16, 2014 - 8:08 pm UTC

Hi Tom,

thanks for your information so far. I understand the AQ team is still working on enhancing this part of the code. You're talking about database parameters to be set.

My question is: Can you give a (very) rough timeframe for this to be made available? Are we talking about 12cR1 or about 12c+ releases? In other words, do you expect a configurable number of threads by the end of 2015, or should we design our applications based on the current architecture?

Regards,
Arian

More to Explore

CLOB/BLOB/etc

Complete documentation on Securefiles and Large Objects here