Skip to Main Content
  • Questions
  • Not able to dequeue messages automatically from Advanced Queue

Breadcrumb

Question and Answer

Connor McDonald

Thanks for the question, Karanvir.

Asked: May 17, 2016 - 2:22 am UTC

Last updated: October 29, 2024 - 3:36 am UTC

Version: Oracle Client 12.1.0.2.0

Viewed 10K+ times! This question is

You Asked

Hi I am new to Oracle Advanced queues. I was trying to queue a message and dequeue it automatically using a stored proc. I am able to dequeue my messages by manually running my stored proc but even though this stored proc is added as subscriber, I was of the view that it should automatically dequeue my messages. Please help me in fixing this. Please see my sample code below:

-- Create a message type
CREATE or replace type Message_typ as object (
subject VARCHAR2(30),
text VARCHAR2(4000));

/* Creating a object type queue table and queue: */
begin
DBMS_AQADM.CREATE_QUEUE_TABLE (
queue_table => 'MYQUEUETABLE',
multiple_consumers => true,
queue_payload_type => 'Message_typ');
end;
/

begin
DBMS_AQADM.CREATE_QUEUE (
queue_name => 'MY_MSG_QUEUE',
queue_table => 'MYQUEUETABLE'
);
end;
/
begin
DBMS_AQADM.START_QUEUE (
queue_name => 'MY_MSG_QUEUE');
end;

--Create a Subscriber Stored Proc
create or replace
PROCEDURE INSERTSAMPLEMESSAGEFROMQUEUE
as
dequeue_options DBMS_AQ.dequeue_options_t;
message_properties DBMS_AQ.message_properties_t;
message_handle RAW(16);
message message_typ;
no_messages exception;
pragma exception_init (no_messages, -25228);

BEGIN

dequeue_options.wait := DBMS_AQ.NO_WAIT;
dequeue_options.consumer_name := 'MySubscriber';
dequeue_options.navigation := dbms_aq.FIRST_MESSAGE;

LOOP

DBMS_AQ.DEQUEUE(queue_name => 'MY_MSG_QUEUE',
dequeue_options => dequeue_options,
message_properties => message_properties,
payload => message,
msgid => message_handle);

INSERT INTO User_Defined_Table (Message)
SELECT
text as Message
from sampleTable where text = message.text;
COMMIT;
END LOOP;
EXCEPTION
WHEN no_messages THEN
DBMS_OUTPUT.PUT_LINE ('No more messages for processing');
COMMIT;
end;
/

--ADD Subscriber
/
BEGIN
DBMS_AQADM.ADD_SUBSCRIBER (
queue_name => 'MY_MSG_QUEUE',
subscriber => SYS.AQ$_AGENT(
'MySubscriber','MY_MSG_QUEUE',NULL)
);
DBMS_AQ.REGISTER (
SYS.AQ$_REG_INFO_LIST(
SYS.AQ$_REG_INFO(
'MY_MSG_QUEUE:MySubscriber',
DBMS_AQ.NAMESPACE_AQ,
'plsql://INSERTSAMPLEMESSAGEFROMQUEUE',
HEXTORAW('FF')
)
),
1
);
END;
/

--ENQUEUE A MESSAGE
DECLARE
enqueue_options dbms_aq.enqueue_options_t;
message_properties dbms_aq.message_properties_t;
message_handle RAW(16);
message message_typ;

begin
message := message_typ('Type',
'This is a test Message');

dbms_aq.enqueue(queue_name => 'MY_MSG_QUEUE',
enqueue_options => enqueue_options,
message_properties => message_properties,
payload => message,
msgid => message_handle);

commit;
end;
/


--**********************************
I thought by doing the above set up, when I enqueue the message, since my Proc is added as subscriber so it should have been automatically dequeuing my message, however that's not the case.
In order to dequeue it I have to manually execute my procedure.

execute CREATEASNSFROMPOS; -- By running this command I am able to dequeue my messages.

Please help me and let me know in case I am missing any step here in setting up my queue so that my proc is automatically dequeueing my messages and inserting those into a sample table.

Thanks in advance




and Chris said...

Your dequeue procedure needs to have the following parameters:

procedure plsqlcallback(
  context  IN  RAW, 
  reginfo  IN  SYS.AQ$_REG_INFO, 
  descr    IN  SYS.AQ$_DESCRIPTOR, 
  payload  IN  RAW,
  payloadl IN  NUMBER);


http://docs.oracle.com/database/121/ARPLS/d_aq.htm#ARPLS401

It should then use the values from descr to set the options.

This gives the following:

create or replace procedure insertsamplemessagefromqueue (
  context  raw,
  reginfo  sys.aq$_reg_info,
  descr    sys.aq$_descriptor,
  payload  raw,
  payloadl number
)
as
  dequeue_options dbms_aq.dequeue_options_t;
  message_properties dbms_aq.message_properties_t;
  message_handle raw ( 16 ) ;
  message message_typ;
  no_messages exception;
  pragma exception_init ( no_messages, -25228 ) ;
begin

  dequeue_options.msgid := descr.msg_id;
  dequeue_options.consumer_name := descr.consumer_name;
  
  loop
    dbms_aq.dequeue ( 
      queue_name => descr.queue_name,--'MY_MSG_QUEUE', 
      dequeue_options => dequeue_options, 
      message_properties => message_properties, 
      payload => message, 
      msgid => message_handle 
    ) ;
    insert into user_defined_table ( message ) 
    values ( message.text );
    commit;
  end loop;
exception
when no_messages then
  dbms_output.put_line ( 'No more messages for processing' ) ;
  commit;
end;
/


With this in place it dequeues automatically:

SQL> select q_name, msgid from MYQUEUETABLE;

no rows selected

SQL> select * from user_defined_table;

no rows selected

SQL> --ENQUEUE A MESSAGE
SQL> declare
  2    enqueue_options dbms_aq.enqueue_options_t;
  3    message_properties dbms_aq.message_properties_t;
  4    message_handle raw ( 16 ) ;
  5    message message_typ;
  6  begin
  7    message := message_typ ( 'Type', 'This is a test Message' ) ;
  8    dbms_aq.enqueue (
  9      queue_name => 'MY_MSG_QUEUE',
 10      enqueue_options => enqueue_options,
 11      message_properties => message_properties,
 12      payload => message,
 13      msgid => message_handle ) ;
 14    commit;
 15  end;
 16  /

PL/SQL procedure successfully completed.

SQL> select q_name, msgid from MYQUEUETABLE;

Q_NAME                         MSGID
------------------------------ --------------------------------
MY_MSG_QUEUE                   3306F75DEB47203BE0531E30C40A550B

SQL> select * from user_defined_table;

MESSAGE
--------------------------------------------------------------------

This is a test Message


You may need to wait a few seconds before it's dequeued.

Further reading:

http://www.oracle-developer.net/display.php?id=411
https://asktom.oracle.com/pls/asktom/f?p=100:11:0::::P11_QUESTION_ID:8760267539329

Rating

  (4 ratings)

Is this answer out of date? If it is, please let us know via a Comment

Comments

Thanks a lot. You saved my day

Karanvir Sharma, May 17, 2016 - 12:42 pm UTC

Thanks a lot Chris.
It worked seamlessly fine.
And of course thanks for the useful links as well.

inconsistent behavior of DBMS_AQ.REGISTER process

Rajesh M, March 22, 2018 - 1:41 pm UTC

Hi Chris/Connors,

I've a similar issue in my sandbox env. I'm facing one problem with inconsistent behavior of DBMS_AQ.REGISTER process. Followed below steps:


1) Created a queue table (multi consumer)

2) Created & Started the Queue (provided all grants)

3) Crated a call back procedure (which will do the DEQUEUE and insert into relevant table)

4) Subscribed the agent for the above queue using DBMS_AQADM.ADD_SUBSCRIBER

5) Registered the call back procedure using DBMS_AQ.REGISTER.

6) ENQUEUE the message into queue for the given agent.

7) Expectation : It should automatically dequeue the message.



Step7 is working perfectly fine in DEV instance but not in SANDBOX. Not sure what's the reason, please suggest if any configuration/setup is required like AQ_TM_PROCESSES Parameter or any patches missing in my SANDBOX env?


you can find my code snippet here:

https://livesql.oracle.com/apex/livesql/file/content_GFTHM5TXRH0Y5W78YR4OFCN32.html



Here's my product version:

Oracle Database 11g Enterprise Edition Release 11.2.0.2.0 - 64bit Production

PL/SQL Release 11.2.0.2.0 - Production

CORE 11.2.0.2.0 Production

TNS for IBM/AIX RISC System/6000: Version 11.2.0.2.0 - Production

NLSRTL Version 11.2.0.2.0 - Production

db parameters for this to work

Darko, October 11, 2024 - 12:14 pm UTC

Hi, I followed the example (can send all my create queue statements) but the deque procedure is not called - records are in the queue table and never de-queued automatically. My DB has aq_tm_process set to 2 and job_queue_process is 60. Is anything else to look at ? Otherwise we have Java bean to process the queues, but I wanted DB to do everything.
Chris Saxon
October 15, 2024 - 12:22 pm UTC

What is your code and which version are you running on?

db parameters for this to work

Darko, October 18, 2024 - 7:35 am UTC

version of db is 19.0, this is the code
create table QUEUETEST
(
queuetest_id NUMBER
)
tablespace USER_DEFAULT_DATA;

-- Grant/Revoke object privileges
grant all on QUEUETEST to PUBLIC;


create or replace type lp_aq_adm.Message_typ as object(subject VARCHAR2(30), text VARCHAR2(80));
/
CREATE OR REPLACE PROCEDURE life_procs.commission_process_darko_callback (
context raw,
reginfo sys.aq$_reg_info,
descr sys.aq$_descriptor,
payload raw,
payloadl number
)
IS
no_messages exception;
pragma exception_init ( no_messages, -25228 ) ;
l_dequeue_options dbms_aq.dequeue_options_t;
l_message_properties dbms_aq.message_properties_t;
v_message sys.aq$_jms_text_message;
v_message_clob CLOB;
v_msg_id RAW(16);

BEGIN
l_dequeue_options.msgid := descr.msg_id;
l_dequeue_options.consumer_name := descr.consumer_name;
LOOP
l_dequeue_options.wait := sys.dbms_aq.no_wait;
dbms_aq.dequeue(queue_name => 'LP_AQ_ADM.COMMISSION_PROCESS_DARKO_Q',
dequeue_options => l_dequeue_options,
message_properties => l_message_properties,
payload => v_message,
msgid => v_msg_id);
v_message.get_text(v_message_clob);
INSERT INTO darko_d.queuetest VALUES (1);
COMMIT;
END LOOP;
EXCEPTION
WHEN no_messages THEN COMMIT;

END commission_process_darko_callback;
/

GRANT EXECUTE ON lp_aq_adm.COMMISSION_PROCESS_DARKO_callback TO PUBLIC;

BEGIN
dbms_aqadm.stop_queue (queue_name => 'lp_aq_adm.COMMISSION_PROCESS_DARKO_Q');
dbms_aqadm.drop_queue (queue_name => 'lp_aq_adm.COMMISSION_PROCESS_DARKO_Q');
dbms_aqadm.drop_queue_table (queue_table => 'lp_aq_adm.COMMISSION_PROCESS_DARKO_QTBL');
END;

BEGIN
-- Create table COMMISSION_PROCESS_DARKO_Q
sys.dbms_aqadm.create_queue_table(queue_table => 'lp_aq_adm.commission_process_darko_qtbl',
queue_payload_type => 'LP_AQ_ADM.Message_typ',
multiple_consumers => true, ---- <<<
sort_list => 'ENQ_TIME',
compatible => '8.1.3',
primary_instance => 0,
secondary_instance => 0,
storage_clause => 'tablespace user_default_DATA pctfree 10 initrans 1 maxtrans 255 storage ( initial 1M next 1M minextents 1 maxextents unlimited pctincrease 0 )');

-- Create Queue COMMISSION_PROCESS_DARKO_Q
sys.dbms_aqadm.create_queue(queue_name => 'lp_aq_adm.COMMISSION_PROCESS_DARKO_Q',
queue_table => 'lp_aq_adm.COMMISSION_PROCESS_DARKO_QTBL',
queue_type => sys.dbms_aqadm.normal_queue,
max_retries => 5,
retry_delay => 0,
retention_time => 604800);
--start Queue
sys.dbms_aqadm.start_queue(queue_name => 'lp_aq_adm.COMMISSION_PROCESS_DARKO_Q');
END;


BEGIN
DBMS_AQADM.ADD_SUBSCRIBER (
queue_name => 'lp_aq_adm.COMMISSION_PROCESS_DARKO_Q',
subscriber => SYS.AQ$_AGENT(
'DarkoSubscriber',null,NULL)
);
DBMS_AQ.REGISTER (
SYS.AQ$_REG_INFO_LIST(
SYS.AQ$_REG_INFO(
'lp_aq_adm.COMMISSION_PROCESS_DARKO_Q:DarkoSubscriber',
DBMS_AQ.NAMESPACE_AQ,
'plsql://lp_aq_adm.COMMISSION_PROCESS_DARKO_callback',
HEXTORAW('FF')
)
),
1
);
END;


create or replace procedure lp_aq_adm.enqueue_msg(p_msg in varchar2,
p_add in varchar2 default null)
as
enqueue_options dbms_aq.enqueue_options_t;
message_properties dbms_aq.message_properties_t;
message_handle RAW(16);
message lp_aq_adm.message_typ;
recipients DBMS_AQ.aq$_recipient_list_t;
BEGIN
-- ADDED
-- SMD: here's where the parameter is used
recipients(1) := SYS.aq$_agent('RECIPIENT', p_add, null);
message_properties.recipient_list := recipients;

message := message_typ('NORMAL MESSAGE', p_msg );
dbms_aq.enqueue(queue_name => 'lp_aq_adm.COMMISSION_PROCESS_DARKO_Q',
enqueue_options => enqueue_options,
message_properties => message_properties,
payload => message,
msgid => message_handle);
end;


begin lp_aq_adm.enqueue_msg('This is a test....'); commit; end;

select * from lp_aq_adm.commission_process_darko_qtbl;


Connor McDonald
October 29, 2024 - 3:36 am UTC

Sorry - are you saying you have *done* this and it now works, or just providing the sample code that does not work?

More to Explore

PL/SQL demos

Check out more PL/SQL tutorials on our LiveSQL tool.

PL/SQL docs

PL/SQL reference manual from the Oracle documentation library