Skip to Main Content
  • Questions
  • Advanced Queuing: subscriber and dequeue

Breadcrumb

We're taking a break this week as Connor and Chris will both be at CloudWorld, the premier Oracle conference of 2024. If you're in Vegas, please come say Hi or pop into our sessions

Question and Answer

Chris Saxon

Thanks for the question, Hans.

Asked: November 08, 2023 - 3:54 pm UTC

Last updated: November 24, 2023 - 5:56 pm UTC

Version: 19c

Viewed 1000+ times

You Asked

Hello T[o|ea]m,

Once, a long time ago, you gave an example of queuing a message asynchronously using PLSQL.

https://asktom.oracle.com/pls/apex/f?p=100:11:0::::P11_QUESTION_ID:8760267539329

I implemented this example myself and got it working.
(I think) I understand the global principle of this example, but I certainly do not understand all implementation-details.
These are my questions:
- I suppose you chose the name of the procedure 'notifyCB' yourself, this is not a fixed or prescribed name, right?
- How did you choose the arguments for this procedure?
Are these fixed or prescribed?
- In the call dbms_aq.register this procedure (notifyCB) is specified, I do understand that and why.
But I do not understand the transfer of the arguments into the procedure notifyCB because a 'call' of notifyCB is not to be found. Is this why you used the word 'automagically'?
- English is not my native language (as you might have noticed).
I looked up in the Oracle References to find what a 'context' is. I hate these Oracle-texts and I still have no clue.
Can you explain this in plain English?
Why do you give it the value hextoraw('FF')?
Is this 'context' transfered to the notifyCB-argument 'context'? (probably see previous question)
- Perhaps this question is already answered in (one of) the previous question(s):
Why does notifyCB have both an argument payload (raw) and payloadl (number)?
It looks like that payloadl is not used.

I hope I can come to the state that I can copy this example into my specific situation while fully understanding it.

and Connor said...

"notifyCB" is your own name - you can choose what you want.

However, the parameters are fixed, because the callback mechanics demand it

aq@ORA920> create or replace procedure notifyCB( context raw,
2 reginfo sys.aq$_reg_info,
3 descr sys.aq$_descriptor,
4 payload raw,
5 payloadl number)

As per the docs:

If a notification message is expected for a RAW payload enqueue, then the PL/SQL callback must have the following signature:

procedure plsqlcallback(
context IN RAW,
reginfo IN SYS.AQ$_REG_INFO,
descr IN SYS.AQ$_DESCRIPTOR,
payload IN RAW,
payloadl IN NUMBER);

If the notification message is expected for an ADT payload enqueue, the PL/SQL callback must have the following signature:

procedure plsqlcallback(
context IN RAW,
reginfo IN SYS.AQ$_REG_INFO,
descr IN SYS.AQ$_DESCRIPTOR,
payload IN VARCHAR2,
payloadl IN NUMBER);


In terms of the "hextoraw" I suspect back in version 9, that parameter was mandatory. Nowadays you could use null and that would be fine.

A mate of mine Mark Hoxey wrote a more in-depth walk through of this here https://markhoxey.wordpress.com/2016/03/01/asynchronous-processing-using-aq-callback/

which you might find more useful than the quick scripts Tom did back then.

Rating

  (6 ratings)

We're not taking comments currently, so please try again later if you want to add a comment.

Comments

used Mark Hoxey's example but ...

Hans Looyschelder, November 09, 2023 - 1:21 pm UTC

Your text made a lot clear to me. Also Mark Hoxey's text was very readable and seemed doable to me. I used his example to write the version for my specific situation. This did not go well yet. Firstly the message is queued, I can see the record in the queue table, but the message is not dequeued. Secondly, when I look into the record in the queue table I see the name of the exception queue table in the field Q_NAME and the name of queue in the field EXCEPTION_QUEUE. I expect this is not how it should be.
Does this information give you any clue? I would like to post my pl/sql-code, it's not very long, but I expect that it will be to long to copy here into this text.

The PL/SQL code for my situation

Hans Looyschelder, November 09, 2023 - 1:28 pm UTC

create or replace
type   glk_t_pllr_insert_values
as     object(
         values_correct         varchar2(1)
       , plpr_id                number(27)
       , pllr_logdoel           varchar2(30)
       , pllr_procesfase        varchar2(30)
       , pllr_run_startdate     varchar2(14)
       , pllr_thread_id         varchar2(15)
       , pllr_var_num_naam      varchar2(31)
       , pllr_var_num_waarde    varchar2(200)
       , pllr_var_char_naam     varchar2(31)
       , pllr_var_char_waarde   varchar2(250)
       , pllr_var_date_naam     varchar2(31)
       , pllr_var_date_waarde   varchar2(14)
       , pllr_verg_procesfase   varchar2(30)
       );
/
create or replace
type   glk_t_pllr_aq_payload
as     object(
         plpr_procesnaam        varchar2(50)
       , plpr_versie            varchar2(30)
       , pllr_logdoel           varchar2(30)
       , pllr_procesfase        varchar2(30)
       , pllr_run_startdate     varchar2(14)
       , pllr_thread_id         varchar2(15)
       , pllr_var_num_naam      varchar2(31)
       , pllr_var_num_waarde    varchar2(200)
       , pllr_var_char_naam     varchar2(31)
       , pllr_var_char_waarde   varchar2(250)
       , pllr_var_date_naam     varchar2(31)
       , pllr_var_date_waarde   varchar2(14)
       , pllr_verg_procesfase   varchar2(30)
       );
/
begin
   dbms_aqadm.create_queue_table(
                queue_table         => 'glk_owner.glk_t_pllr_qt'
              , queue_payload_type  => 'glk_owner.glk_t_pllr_aq_payload'
              , multiple_consumers  => false
              );
   dbms_aqadm.create_queue(
                queue_name          => 'glk_owner.glk_t_pllr_q'
              , queue_table         => 'glk_owner.glk_t_pllr_qt'
              );
   dbms_aqadm.start_queue(
                queue_name          => 'glk_owner.glk_t_pllr_q'
              );
end;
/
create or replace package glk_t_pllr_aq
as
   procedure enqueue_logregel(
               p_procesnaam      in varchar2
             , p_versie          in varchar2
             , p_logdoel         in varchar2
             , p_procesfase      in varchar2
             , p_run_startdate   in varchar2
             , p_thread_id       in varchar2
             , p_var_num_naam    in varchar2
             , p_var_num_waarde  in varchar2
             , p_var_char_naam   in varchar2
             , p_var_char_waarde in varchar2
             , p_var_date_naam   in varchar2
             , p_var_date_waarde in varchar2
             , p_verg_procesfas  in varchar2
             );
   procedure dequeue_logregel(
               context               raw
             , reginfo               sys.aq$_reg_info
             , descr                 sys.aq$_descriptor
             , payload               raw
             , payloadl              number
             );
end glk_t_pllr_aq;
create or replace package body glk_t_pllr_aq
as
   procedure enqueue_logregel(
               p_procesnaam      in varchar2
             , p_versie          in varchar2
             , p_logdoel         in varchar2
             , p_procesfase      in varchar2
             , p_run_startdate   in varchar2
             , p_thread_id       in varchar2
             , p_var_num_naam    in varchar2
             , p_var_num_waarde  in varchar2
             , p_var_char_naam   in varchar2
             , p_var_char_waarde in varchar2
             , p_var_date_naam   in varchar2
             , p_var_date_waarde in varchar2
             , p_verg_procesfas  in varchar2
             )
   is
      v_enqueue_options      dbms_aq.enqueue_options_t;
      v_message_properties   dbms_aq.message_properties_t;
      v_msgid                raw(16);
      v_payload              glk_owner.glk_t_pllr_aq_payload;
   begin
      v_payload := glk_owner.glk_t_pllr_aq_payload(
                               plpr_procesnaam      => p_procesnaam
                             , plpr_versie          => p_versie
                             , pllr_logdoel         => p_logdoel
                             , pllr_procesfase      => p_procesfase
                             , pllr_run_startdate   => p_run_startdate
                             , pllr_thread_id       => p_thread_id
                             , pllr_var_num_naam    => p_var_num_naam
                             , pllr_var_num_waarde  => p_var_num_waarde
                             , pllr_var_char_naam   => p_var_char_naam
                             , pllr_var_char_waarde => p_var_char_waarde
                             , pllr_var_date_naam   => p_var_date_naam
                             , pllr_var_date_waarde => p_var_date_waarde
                             , pllr_verg_procesfase => p_verg_procesfas
                             );
      dbms_aq.enqueue( queue_name         => 'glk_owner.glk_t_pllr_q'
                     , enqueue_options    => v_enqueue_options
                     , message_properties => v_message_properties
                     , payload            => v_payload
                     , msgid              => v_msgid
                     );
   end enqueue_logregel;
   procedure dequeue_logregel(
              context               raw
            , reginfo               sys.aq$_reg_info
            , descr                 sys.aq$_descriptor
            , payload               raw
            , payloadl              number
            )
   is
      v_rc_ok                  varchar2(1) := '0';
      v_dequeue_options        dbms_aq.dequeue_options_t;
      v_message_properties     dbms_aq.message_properties_t;
      v_message_handle         raw(16);
      v_payload                glk_owner.glk_t_pllr_aq_payload;
      v_insert_values          glk_owner.glk_t_pllr_insert_values;
      v_plpr_id                number(27);
   begin
      v_dequeue_options.msgid := descr.msg_id;
      v_dequeue_options.wait  := dbms_aq.no_wait;
      dbms_aq.dequeue(
                queue_name         => descr.queue_name
              , dequeue_options    => v_dequeue_options
              , message_properties => v_message_properties
              , payload            => v_payload
              , msgid              => v_message_handle
              );
      v_insert_values := glk_t_pllr_trg.checklogargs(
                                          fp_plpr_procesnaam      => v_payload.plpr_procesnaam
                                        , fp_plpr_versie          => v_payload.plpr_versie
                                        , fp_pllr_logdoel         => v_payload.pllr_logdoel
                                        , fp_pllr_procesfase      => v_payload.pllr_procesfase
                                        , fp_pllr_run_startdate   => v_payload.pllr_run_startdate
                                        , fp_pllr_thread_id       => v_payload.pllr_thread_id
                                        , fp_pllr_var_num_naam    => v_payload.pllr_var_num_naam
                                        , fp_pllr_var_num_waarde  => v_payload.pllr_var_num_waarde
                                        , fp_pllr_var_char_naam   => v_payload.pllr_var_char_naam
                                        , fp_pllr_var_char_waarde => v_payload.pllr_var_char_waarde
                                        , fp_pllr_var_date_naam   => v_payload.pllr_var_date_naam
                                        , fp_pllr_var_date_waarde => v_payload.pllr_var_date_waarde
                                        , fp_pllr_verg_procesfase => v_payload.pllr_verg_procesfase
                                        );
      if v_insert_values.values_correct = v_rc_ok
      then
         v_plpr_id := v_insert_values.plpr_id;
      else
         select plpr_id
         into   v_plpr_id
         from   glk_t_plpr
         where  1 = 1
         and    plpr_procesnaam = 'LogError'
         and    plpr_versie     = '0'
         ;
      end if;
      glk_t_pllr_pg.insert_row(
                      p_plpr_id              => v_plpr_id
                    , p_pllr_logdoel         => v_insert_values.pllr_logdoel
                    , p_pllr_procesfase      => v_insert_values.pllr_procesfase
                    , p_pllr_run_startdate   => v_insert_values.pllr_run_startdate
                    , p_pllr_thread_id       => v_insert_values.pllr_thread_id
                    , p_pllr_var_num_naam    => v_insert_values.pllr_var_num_naam
                    , p_pllr_var_num_waarde  => v_insert_values.pllr_var_num_waarde
                    , p_pllr_var_char_naam   => v_insert_values.pllr_var_char_naam
                    , p_pllr_var_char_waarde => v_insert_values.pllr_var_char_waarde
                    , p_pllr_var_date_naam   => v_insert_values.pllr_var_date_naam
                    , p_pllr_var_date_waarde => v_insert_values.pllr_var_date_waarde
                    , p_pllr_verg_procesfase => v_insert_values.pllr_verg_procesfase
                    );
      commit;
   end dequeue_logregel;
end glk_t_pllr_aq;
/
begin
   dbms_aq.register(
             reg_list  => sys.aq$_reg_info_list(
                                sys.aq$_reg_info(
                                      name                       => 'glk_owner.glk_t_pllr_q'
                                    , namespace                  => dbms_aq.namespace_aq
                                    , callback                   => 'plsql://glk_owner.glk_t_pllr_aq.dequeue_logregel'
                                    , context                    => null
                                    )
                              )
           , reg_count => 1
           );
end;
/

solved

Hans Looyschelder, November 15, 2023 - 6:14 pm UTC

Found it.
I am very used to have error-messages from oracle.
But when something goes wrong in the callback-procedure:
- nothing happens (the messege is enqueued but not disqueued)
- you get no error messages at all. I should have figured this out a bit sooner, sorry about that).
I did have to search in the non-AQ-part that I wrote and found why the dequeue did not work.
All that is shown in this question about Oracle AQ was spot on, it really helped me out, thanks!
(my second and third comments are (and, as I know now, were) not relevant and can be removed)
Chris Saxon
November 17, 2023 - 10:58 am UTC

Great, glad you got it sorted.

dequeue sort order

Hans Looyschelder, November 19, 2023 - 3:16 pm UTC

Is there a way to enforce that messages are dequeued in the same order as they were enqueued? (And if yes, what is this way or can you point me to a readable text that explains this?)
Chris Saxon
November 20, 2023 - 1:47 pm UTC

AQ supports different mechanisms to control the order in which messages are processed. Applications can specify a ‘priority’ for each message at enqueue time, which can be used to control the order in which messages are consumed. Alternately, messages can also be sorted according to enqueue time or commit time to get a FIFO order for consuming the messages. Commit time is the time at which the transaction was committed, and this is especially useful when transactions are interdependent.

https://www.oracle.com/a/otn/docs/database/aq-db19c-technicalbrief.pdf

More discussion at:

https://docs.oracle.com/en/database/oracle/oracle-database/19/adque/aq-introduction.html#GUID-83F68756-B358-45ED-8C21-1DE6E87DCF7A__CHDIAGFG

extra about dequeuing order

Hans Looyschelder, November 21, 2023 - 5:21 am UTC

Thank you for the answer. I tried both suggestions.
On https://docs.oracle.com/en/database/oracle/oracle-database/19/adque/aq-introduction.html#GUID-95868022-ECDA-4685-9D0A-52ED7663C84B I read: "Producers can enqueue messages in any sequence. Messages are not necessarily dequeued in the order in which they are enqueued."
In my situation I calculate the time gone by between a message and another previous message (of the same sort). I often get negative numbers. I can see that some messages have taken long to enqueue-dequeue so that sometimes another message enqueued later but dequeued sooner.
Is this something I have to accept and calculate the timediffs later in a separate process or should this really be solvable (iow: did I make some programming mistake)?
Chris Saxon
November 21, 2023 - 10:19 am UTC

I've done some more digging, MOS note AQ/PLSQL Notifications Do Not Maintain FIFO or Priority Ordering (Doc ID 225810.1) has this to say on the topic:

In 11.1 onwards individual scheduler jobs are not submitted for each message instead a single scheduler job is started and extra jobs can be added depending on the load on the queue. When multiple jobs are handling the notifications associated with single queue essentially the same problem as pre-11.1 databases exists in that FIFO and/or priority based ordering is not maintained.

However, there is a work around that can be implemented in the callback procedure itself in order for it to maintain FIFO and/or priority based ordering; it is based on following premises:

1. A callback procedure processes all current messages on a queue, not just the message that the callback function was invoked for, i.e., dequeue the messages, and not use the information that is passed into the callback function.

2. A callback procedure is a point of serialisation, i.e., the process body of the callback function is mutually exclusive for all other invocations of that callback function.

3. No other process is to dequeue messages from the queue.


(Emphasis mine). It includes a lengthy demo of how to do this - check the note for full details.

solved

Hans Looyschelder, November 24, 2023 - 8:38 am UTC

The solution shown in the document "MOS note AQ/PLSQL Notifications Do Not Maintain FIFO or Priority Ordering (Doc ID 225810.1)" works, I tried it out for myself.
After consulting the Oracle DBA in our organization I decided not to use it. Therefor I accept that log records will not be inserted chronologically and I will calculate the times between log events afterwards in a separate process.

Thanks a lot for the time & digging to help me out!
Chris Saxon
November 24, 2023 - 5:56 pm UTC

You're welcome.

More to Explore

PL/SQL demos

Check out more PL/SQL tutorials on our LiveSQL tool.

PL/SQL docs

PL/SQL reference manual from the Oracle documentation library