Thanks for the question, Karanvir.
Asked: May 17, 2016 - 2:22 am UTC
Last updated: October 29, 2024 - 3:36 am UTC
Version: Oracle Client 12.1.0.2.0
Viewed 10K+ times! This question is
You Asked
Hi I am new to Oracle Advanced queues. I was trying to queue a message and dequeue it automatically using a stored proc. I am able to dequeue my messages by manually running my stored proc but even though this stored proc is added as subscriber, I was of the view that it should automatically dequeue my messages. Please help me in fixing this. Please see my sample code below:
-- Create a message type
CREATE or replace type Message_typ as object (
subject VARCHAR2(30),
text VARCHAR2(4000));
/* Creating a object type queue table and queue: */
begin
DBMS_AQADM.CREATE_QUEUE_TABLE (
queue_table => 'MYQUEUETABLE',
multiple_consumers => true,
queue_payload_type => 'Message_typ');
end;
/
begin
DBMS_AQADM.CREATE_QUEUE (
queue_name => 'MY_MSG_QUEUE',
queue_table => 'MYQUEUETABLE'
);
end;
/
begin
DBMS_AQADM.START_QUEUE (
queue_name => 'MY_MSG_QUEUE');
end;
--Create a Subscriber Stored Proc
create or replace
PROCEDURE INSERTSAMPLEMESSAGEFROMQUEUE
as
dequeue_options DBMS_AQ.dequeue_options_t;
message_properties DBMS_AQ.message_properties_t;
message_handle RAW(16);
message message_typ;
no_messages exception;
pragma exception_init (no_messages, -25228);
BEGIN
dequeue_options.wait := DBMS_AQ.NO_WAIT;
dequeue_options.consumer_name := 'MySubscriber';
dequeue_options.navigation := dbms_aq.FIRST_MESSAGE;
LOOP
DBMS_AQ.DEQUEUE(queue_name => 'MY_MSG_QUEUE',
dequeue_options => dequeue_options,
message_properties => message_properties,
payload => message,
msgid => message_handle);
INSERT INTO User_Defined_Table (Message)
SELECT
text as Message
from sampleTable where text = message.text;
COMMIT;
END LOOP;
EXCEPTION
WHEN no_messages THEN
DBMS_OUTPUT.PUT_LINE ('No more messages for processing');
COMMIT;
end;
/
--ADD Subscriber
/
BEGIN
DBMS_AQADM.ADD_SUBSCRIBER (
queue_name => 'MY_MSG_QUEUE',
subscriber => SYS.AQ$_AGENT(
'MySubscriber','MY_MSG_QUEUE',NULL)
);
DBMS_AQ.REGISTER (
SYS.AQ$_REG_INFO_LIST(
SYS.AQ$_REG_INFO(
'MY_MSG_QUEUE:MySubscriber',
DBMS_AQ.NAMESPACE_AQ,
'plsql://INSERTSAMPLEMESSAGEFROMQUEUE',
HEXTORAW('FF')
)
),
1
);
END;
/
--ENQUEUE A MESSAGE
DECLARE
enqueue_options dbms_aq.enqueue_options_t;
message_properties dbms_aq.message_properties_t;
message_handle RAW(16);
message message_typ;
begin
message := message_typ('Type',
'This is a test Message');
dbms_aq.enqueue(queue_name => 'MY_MSG_QUEUE',
enqueue_options => enqueue_options,
message_properties => message_properties,
payload => message,
msgid => message_handle);
commit;
end;
/
--**********************************
I thought by doing the above set up, when I enqueue the message, since my Proc is added as subscriber so it should have been automatically dequeuing my message, however that's not the case.
In order to dequeue it I have to manually execute my procedure.
execute CREATEASNSFROMPOS; -- By running this command I am able to dequeue my messages.
Please help me and let me know in case I am missing any step here in setting up my queue so that my proc is automatically dequeueing my messages and inserting those into a sample table.
Thanks in advance
and Chris said...
Your dequeue procedure needs to have the following parameters:
procedure plsqlcallback(
context IN RAW,
reginfo IN SYS.AQ$_REG_INFO,
descr IN SYS.AQ$_DESCRIPTOR,
payload IN RAW,
payloadl IN NUMBER);
http://docs.oracle.com/database/121/ARPLS/d_aq.htm#ARPLS401 It should then use the values from descr to set the options.
This gives the following:
create or replace procedure insertsamplemessagefromqueue (
context raw,
reginfo sys.aq$_reg_info,
descr sys.aq$_descriptor,
payload raw,
payloadl number
)
as
dequeue_options dbms_aq.dequeue_options_t;
message_properties dbms_aq.message_properties_t;
message_handle raw ( 16 ) ;
message message_typ;
no_messages exception;
pragma exception_init ( no_messages, -25228 ) ;
begin
dequeue_options.msgid := descr.msg_id;
dequeue_options.consumer_name := descr.consumer_name;
loop
dbms_aq.dequeue (
queue_name => descr.queue_name,--'MY_MSG_QUEUE',
dequeue_options => dequeue_options,
message_properties => message_properties,
payload => message,
msgid => message_handle
) ;
insert into user_defined_table ( message )
values ( message.text );
commit;
end loop;
exception
when no_messages then
dbms_output.put_line ( 'No more messages for processing' ) ;
commit;
end;
/
With this in place it dequeues automatically:
SQL> select q_name, msgid from MYQUEUETABLE;
no rows selected
SQL> select * from user_defined_table;
no rows selected
SQL> --ENQUEUE A MESSAGE
SQL> declare
2 enqueue_options dbms_aq.enqueue_options_t;
3 message_properties dbms_aq.message_properties_t;
4 message_handle raw ( 16 ) ;
5 message message_typ;
6 begin
7 message := message_typ ( 'Type', 'This is a test Message' ) ;
8 dbms_aq.enqueue (
9 queue_name => 'MY_MSG_QUEUE',
10 enqueue_options => enqueue_options,
11 message_properties => message_properties,
12 payload => message,
13 msgid => message_handle ) ;
14 commit;
15 end;
16 /
PL/SQL procedure successfully completed.
SQL> select q_name, msgid from MYQUEUETABLE;
Q_NAME MSGID
------------------------------ --------------------------------
MY_MSG_QUEUE 3306F75DEB47203BE0531E30C40A550B
SQL> select * from user_defined_table;
MESSAGE
--------------------------------------------------------------------
This is a test Message
You may need to wait a few seconds before it's dequeued.
Further reading:
http://www.oracle-developer.net/display.php?id=411 https://asktom.oracle.com/pls/asktom/f?p=100:11:0::::P11_QUESTION_ID:8760267539329
Rating
(4 ratings)
Is this answer out of date? If it is, please let us know via a Comment