Skip to Main Content
  • Questions
  • writing a stand-alone application to continuously monitor a database queue (AQ)

Breadcrumb

Question and Answer

Connor McDonald

Thanks for the question.

Asked: August 11, 2000 - 6:55 pm UTC

Last updated: February 19, 2019 - 5:59 am UTC

Version: oracle server 8.1.5 enterprise edition

Viewed 1000+ times

You Asked

Hi Tom,
A question regarding oracle AQ...
I wish to write a small stand-alone application that would *constantly* monitor a queue (only one queue) for the arrival of a message and as soon as a mesage arrives, take some action. I figured I could use the DBMS_AQ.DEQUEUE with wait option of FOREVER (the default). What's unclear to me however, is that would this application be running as a daemon all the time in the background? Or is there a way to register the application name with the queuing system so that any time a message arrives in thte queue it executes this application.

Your help is appreciated...

and Tom said...



The way queues work is that you would start your process externally (eg: I put them in my startup scripts so that after the database is started -- the daemons get started). It is upto you to get them going and keep them going. We do not manage the "dequeue" processes (which may or may not even be on the same machine with the database)



Rating

  (13 ratings)

Is this answer out of date? If it is, please let us know via a Comment

Comments

Multiple consumers

Quenton, January 21, 2003 - 1:53 pm UTC

I would like to create an application pool to dequeue messages. Based on the answer above, I would have to start my application multiple times (say 10) to fill the pool (which could be on a different machine than my database). Each consumer in the pool would have a different aq$_agent.name. Now when a producer enqueues a message it must define which consumer the message will be directed to.

I am reasonably confident that this scenario is correct. My question: 'Is there a mechanism or API available that would enable my producers to distribute the messages evenly across all consumers?'

Tom Kyte
January 21, 2003 - 2:31 pm UTC

why would you do it like that? just have multiple dequeuers on the same queue and don't make anyone direct a message to a specific dequeuer.

AQ is built do to that last part -- it is what it does.

Multiple Identical Dequeuers

Quenton, January 21, 2003 - 5:16 pm UTC

If I have multiple identical dequeuers (the same process started many times) on the same queue how do I prevent them from attempting to handle the same message?

The following excerpts from the "Application Developer's Guide - Advanced Queuing" have me a little confused.

On page 7-3 udner Basic Queuing — Many Producers, Many Consumers of Discrete Messages
it says "In this next stage, many producers may enqueue messages, each message being processed by a different consumer depending on type and correlation identifier.
All of my consumers would have the same type and correlation wouldn't they?

Page 8-63 under Multiple Recipients
it says "A message is considered PROCESSED only when all intended consumers have successfully dequeued the message."
Since all of my consumers are identical won't they all have to process the message before it can be dequeued?

Tom Kyte
January 21, 2003 - 7:27 pm UTC

that -- is what AQ is about.  It does the "at least once, at most once" processing.  AQ is all about allowing multiple consumers feed off of a single queue table -- at the same time, without blocking.

this is what AQ is <b>all about</b>....

Do this (stolen right from the AQ app developers guide basically)

set echo on

connect / as sysdba

DROP USER aq CASCADE;
CREATE USER aq IDENTIFIED BY aq;
GRANT EXECUTE ON DBMS_AQADM TO aq;
GRANT Aq_administrator_role TO aq;
GRANT CONNECT, RESOURCE TO aq;
GRANT EXECUTE ON dbms_aq TO aq;
connect aq/aq


CREATE type aq.Message_typ as object ( subject VARCHAR2(30), text VARCHAR2(80));
/


EXECUTE DBMS_AQADM.CREATE_QUEUE_TABLE ( queue_table => 'aq.objmsgs80_qtab', queue_payload_type =>  'aq.Message_typ' );
EXECUTE DBMS_AQADM.CREATE_QUEUE ( queue_name => 'MSG_QUEUE', queue_table => 'aq.objmsgs80_qtab');
EXECUTE DBMS_AQADM.START_QUEUE (  queue_name => 'MSG_QUEUE');

create procedure enqueue_msg( p_msg in varchar2 )
as
 enqueue_options dbms_aq.enqueue_options_t;
 message_properties dbms_aq.message_properties_t;
 message_handle RAW(16);
 message aq.message_typ;
BEGIN
   message := message_typ('NORMAL MESSAGE',  p_msg );
   dbms_aq.enqueue(queue_name => 'msg_queue', enqueue_options => enqueue_options,
                   message_properties => message_properties, payload => message,
                   msgid => message_handle);
   COMMIT;
end;
/

create procedure dequeue_msg
as
 dequeue_options dbms_aq.dequeue_options_t;
 message_properties dbms_aq.message_properties_t;
 message_handle RAW(16);
 message aq.message_typ;
BEGIN
   DBMS_AQ.DEQUEUE(queue_name => 'msg_queue', dequeue_options => dequeue_options,
                   message_properties => message_properties,
                   payload => message, msgid => message_handle);

   DBMS_OUTPUT.PUT_LINE ('Message: ' || message.subject || ' ... ' || message.text );
   COMMIT;
END;
/


Ok, now, open 3 separate windows.  I'm using unix (linux) so I wrote a small script:

[tkyte@tkyte-pc-isdn tkyte]$ cat dq.csh
#!/bin/csh -f

while (1)
sqlplus aq/aq <<EOF
exec dequeue_msg
exit
EOF
end
[tkyte@tkyte-pc-isdn tkyte]$


In 2 of the 3 windows -- I ran that.  They each block on the dequeue.

Now, in the other window do this:


sqlplus aq/aq

SQL> exec enqueue_msg( 'Hello' );


and you'll see ONE of them get it, process it and go back for more.  Do it again and you'll probably see the OTHER one get it, process it and go back for more (or the first one will get it again).

Now, put that into a loop

begin
   for i in 1 .. 50
   loop
     equeue_msg( 'hello ' || i );
   end loop;
end;
/


and just watch them goto town.



This is what AQ does!



 

Excellent Example

Quenton, January 22, 2003 - 11:42 am UTC

Thanks, thats exactly what I needed to know.

Enqueue and Dequeue ADT with Pro*C

Robin, January 29, 2003 - 5:30 pm UTC

I am trying to enqueue and dequeue messages using Pro*C. I am following the example provided by Pro*C/C++ Application Developer's Guide Release 8.1.6.
I have defined my payload as follows;
create or replace type xml_pyld_typ AS OBJECT (xml_pyld CLOB);

Here is the code I am using
EXEC SQL BEGIN DECLARE SECTION;

struct XML_PYLD_TYP
{
OCIClobLocator * xml_pyld;
};
typedef struct XML_PYLD_TYP XML_PYLD_TYP;

XML_PYLD_TYP *message = (XML_PYLD_TYP*)0; /* payload */
char user[60]="aquser/aquser@aq"; /* user logon password */
char subject[30]; /* components of the */
char txt[80]; /* payload type */
OCIClobLocator *a_clob; /* CLOB payload */

EXEC SQL END DECLARE SECTION;

EXEC SQL CONNECT :user;


/* Allocate memory for the host variable from the object cache : */
EXEC SQL ALLOCATE :message;

/* ENQUEUE */

strcpy(subject, "NORMAL ENQUEUE");
strcpy(txt, "The Enqueue was done through PLSQL embedded in PROC");

/* Initialize the components of message : */
EXEC SQL OBJECT SET xml_pyld OF :message TO :a_clob;

/* Embedded PLSQL call to the AQ enqueue procedure : */
EXEC SQL EXECUTE
DECLARE
message_properties dbms_aq.message_properties_t;
enqueue_options dbms_aq.enqueue_options_t;
msgid RAW(16);

BEGIN
/* Bind the host variable 'message' to the payload: */
dbms_aq.enqueue('xmlmsg_q',
enqueue_options,
message_properties,
:message,
msgid);
END;
END-EXEC;


I am getting the precompiler error "...PLS-S-00306, wrong number or types of arguments in call to 'ENQUEUE'".

I have not been able to find any other Pro*C examples using ADT and CLOB's. Can you provide an example of enqueing/dequeueing ADT messages containing CLOBs using Pro*C on an 8.1.7 database?

Tom Kyte
January 30, 2003 - 8:37 am UTC

Enqueue and Dequeue ADT with Pro*C using CLOB's

Robin, January 30, 2003 - 11:56 am UTC

This is the example I am working with, but I have modified it to use CLOB's within the Object payload type. The problem is I cannot enqueue or dequeue messages with the CLOB type in Pro*C.

Are there any Pro*C examples of enqueueing and dequeuing using CLOB's?

Tom Kyte
January 30, 2003 - 11:58 am UTC

don't know -- you might try metalink.

OCIClobLocator...

Mark A. Williams, January 30, 2003 - 12:55 pm UTC

I could be way off base here - it has been a long time since I've done any real coding! Anyway, here's an observation that may be relevant:

a_clob is:

OCIClobLocator *a_clob; /* CLOB payload */

Then the xml_pyld attribute of the XML_PYLD_TYP struct is set to a_clob:

EXEC SQL OBJECT SET xml_pyld OF :message TO :a_clob;

but what does a_clob point to at this point?

It doesn't look like it has been initialized. Doing a 'EXEC SQL ALLOCATE :a_clob' should allocate the lob locator and initialize it to empty.

Again, don't know if that has anything to do with your problem or not - just wanted to through it out there.

- Mark

I can't spell...

Mark A. Williams, January 30, 2003 - 12:57 pm UTC

Meant to write "throw" not "through" in that review.

Doh!

Tom Kyte
January 30, 2003 - 1:04 pm UTC

then/than
through/throw
from/form

i do that all of the time...

OCIClobLocator...

Robin, January 30, 2003 - 2:08 pm UTC

Thanks for the followup. Tried your suggestion, but still have the same error.
I welcome any other suggestions you have.

Thanks again.

Leaky Abstractions in AQ ?

Norbert Klamann, September 15, 2006 - 7:52 am UTC

We are implementing an AQ based solution now and we think that it is rather cumbersome to dequeue with a separate OS process, like Tom showed with his shell-script.

In this case we try to use a database function and must resort to the means of the Operating System. It is good that this is possible, but it is bad that it seems to be required.

What is missing is the possibilty to start a procedure in the database as a background process whicht detaches from the current session.

Or am I missing something ?


Tom Kyte
September 15, 2006 - 8:58 am UTC

dbms_scheduler can do that (run external processes)

java stored procedures can do that (run external processes)

Enqueue messages with priority

Rae, February 09, 2007 - 6:34 am UTC

Hi Tom,

I created a sample script to Enqueue and Dequeue Messages by Priority Using PL/SQL. I based it on the link you provided:
http://download-west.oracle.com/docs/cd/A87860_01/doc/appdev.817/a76938/adq_aex3.htm

----------------------------------
/* Enqueue two messages with priority 7 and 5: */
DECLARE
enqueue_options dbms_aq.enqueue_options_t;
message_properties dbms_aq.message_properties_t;
message_handle RAW(16);
message AQ_APPS_USR.message_typ;

BEGIN
message := message_typ('PRIORITY MESSAGE',
'Enqueued at priority 7.');

message_properties.priority := 7;

DBMS_AQ.ENQUEUE(queue_name => 'msg_queue',
enqueue_options => enqueue_options,
message_properties => message_properties,
payload => message,
msgid => message_handle);

message := message_typ('PRIORITY MESSAGE',
'Enqueued at priority 5.');

message_properties.priority := 5;

DBMS_AQ.ENQUEUE(queue_name => 'msg_queue',
enqueue_options => enqueue_options,
message_properties => message_properties,
payload => message,
msgid => message_handle);
END;

/* Dequeue from priority queue: */
DECLARE
dequeue_options DBMS_AQ.dequeue_options_t;
message_properties DBMS_AQ.message_properties_t;
message_handle RAW(16);
message AQ_APPS_USR.message_typ;

BEGIN
dequeue_options.navigation := DBMS_AQ.first_message;
DBMS_AQ.DEQUEUE(queue_name => 'msg_queue',
dequeue_options => dequeue_options,
message_properties => message_properties,
payload => message,
msgid => message_handle);

DBMS_OUTPUT.PUT_LINE ('Message: ' || message.subject ||
' ... ' || message.text );

COMMIT;
dequeue_options.navigation := DBMS_AQ.first_message;
DBMS_AQ.DEQUEUE(queue_name => 'msg_queue',
dequeue_options => dequeue_options,
message_properties => message_properties,
payload => message,
msgid => message_handle);

DBMS_OUTPUT.PUT_LINE ('Message: ' || message.subject ||
' ... ' || message.text );

COMMIT;
END;

----------------------------------
This is the output after running DBMS_AQ.DEQUEUE:
Message: PRIORITY MESSAGE ... Enqueued at priority 7.
Message: PRIORITY MESSAGE ... Enqueued at priority 5.
----------------------------------

I thought the expected output is that priority 5 should be displayed first. Can you enlighten me on this?

I'm not sure if this info is helpful to you:

SQL> SELECT compatible FROM dba_queue_tables WHERE owner = 'AQ_APPS_USR';

COMPATIBLE
----------
8.0.3

Thanks,
Rae

Enqueue messages with priority

A reader, February 12, 2007 - 4:56 am UTC

Hi Tom,

Pls ignore my previous question. I did not create the Prioritized Message Queue Table that's why priority is not working during dequeue.

I have a new question though.
Can you tell me or give me links that say what are the advantages of AQ.

Thanks,
Rae

Performance of dequeue

jock, February 17, 2019 - 10:29 am UTC

In your answer where you illustrated use of 2 DEQUEUE windows, I have a couple of performance questions;

1) Whilst the DEQUEUE is in sleep mode (WAIT FOREVER), I can see the session is waiting on the event - "Streams AQ: waiting for messages in the queue", that is an IDLE wait class and not actually consuming ANY resources, correct ?

2) When a message is queued, does AQ Enqueue process wake up both DEQUEUE windows ? So you would take the performance HIT that both would attempt to SELECT the message, one will succeed, the other will fail and return nothing ?
Connor McDonald
February 18, 2019 - 2:19 am UTC

1) Whilst the DEQUEUE is in sleep mode (WAIT FOREVER), I can see the session is waiting on the event - "Streams AQ: waiting for messages in the queue", that is an IDLE wait class and not actually consuming ANY resources, correct ?


Correct. It's similar to waiting on a row lock on a table. You just "sit there"


2) When a message is queued, does AQ Enqueue process wake up both DEQUEUE windows ? So you would take the performance HIT that both would attempt to SELECT the message, one will succeed, the other will fail and return nothing ?

The other would not fail, it was stay "in the wait", ie, your application would be *unaware* that it got a signal, had a look but found nothing. From the application perspective it is still just sitting in dequeue mode

Dequeue performance

jock, February 18, 2019 - 8:10 am UTC

Thanks Connor for quick response.

Regarding 2) - sorry, I meant found nothing, not failed. However, I'm more concerned about the performance hit. For a test, I spun up 1 ENQUEUE window with 180,000 messages + 8 DEQUEUE windows, and noticed executions (740,000+!!) of this DEQUEUE SELECT statement in the AWR go through the roof;

select /*+ INDEX(TAB AQ$_MYQTAB_I) */ tab.rowid...

I found that 2 DEQUEUE windows actually performs better than 8, at least for my test message pattern.

Am I missing a trick here ? Is there a better approach than playing about with the number of DEQUEUE windows ?
Connor McDonald
February 19, 2019 - 5:59 am UTC

For me, the most benefits I've seen when you want to agressively dequeue is using batch dequeue. Some pseudo-code here

declare
   g_queue               varchar2(30) := 'MY_QUEUE';
   g_queue_array_size    int := 50;
   g_timeout             int := 60;
   l_message             my_array;
   l_message_properties  dbms_aq.message_properties_array_t;

   l_dequeue_options     dbms_aq.dequeue_options_t;
   l_message_properties  dbms_aq.message_properties_array_t := dbms_aq.message_properties_array_t();
   l_message             my_array := my_array();
   l_message_cnt         pls_integer;
   l_msgid               dbms_aq.msgid_array_t := dbms_aq.msgid_array_t();

   e_queue_timeout    exception;
   pragma exception_init(e_queue_timeout,-25228);

begin
   p_message_properties := l_message_properties;
   p_message            := l_message;
   
   l_message.extend(g_queue_array_size);
   l_message_properties.extend(g_queue_array_size);

   l_dequeue_options.wait := nvl(g_timeout,10);
   l_dequeue_options.navigation := dbms_aq.first_message;

   l_message_cnt :=
       dbms_aq.dequeue_array(
                   queue_name               => g_queue,
                   dequeue_options          => l_dequeue_options,
                   array_size               => g_queue_array_size,
                   message_properties_array => l_message_properties,
                   payload_array            => l_message,
                   msgid_array              => l_msgid);

exception
   when e_queue_timeout then
      ...   
end;


More to Explore

DBMS_AQ

More on PL/SQL routine DBMS_AQ here