Use Case: We are implementing partitioning in the AQ to enable batch processing with array dequeue while ensuring:
1. Ordering maintained per partition (FIFO).
2. Possibility to Array Deqeue on Application side. ( Unless provided a more performant solution ).
Approach & Observation:1. We partition messages using a sort of partition_id, ( User defined in queue table) assigning each message a hash (eg: 1-5 ) at enqueue time.
PROCEDURE Enqueu_Event_(
partition_id_ NUMBER,
message_content_ JSON_OBJECT_T
) IS
queue_payload_ TEST_PAYLOAD_TYPE;
r_enqueue_options_ DBMS_AQ.ENQUEUE_OPTIONS_T;
r_message_properties_ DBMS_AQ.MESSAGE_PROPERTIES_T;
v_message_handle_ RAW(16);
BEGIN
-- Construct payload with provided partition_id
queue_payload_ := TEST_PAYLOAD_TYPE(partition_id_, message_content_.to_blob());
-- Enqueue message
DBMS_AQ.ENQUEUE(
queue_name => OUT_QUEUE_NAME,
enqueue_options => r_enqueue_options_,
message_properties => r_message_properties_,
payload => queue_payload_,
msgid => v_message_handle_
);
COMMIT;
END Publish_Event_;
/
2. On the consumer side ( Polling ), we use AQDequeueOptions with:
dequeueOpts.navigation := DBMS_AQ.FIRST_MESSAGE; to fetch the first message for ordering.
deqeueOpts.deq_condition := ("partition_id = 1") to filter messages per partition.
PROCEDURE Dequeue_Events_(
partition_id_ NUMBER
) IS
r_dequeue_options_ DBMS_AQ.DEQUEUE_OPTIONS_T;
r_message_properties_ DBMS_AQ.MESSAGE_PROPERTIES_ARRAY_T;
v_message_handle_ DBMS_AQ.MSGID_ARRAY_T;
queue_payload_ TEST_PAYLOAD_TYPE;
batch_size_ CONSTANT PLS_INTEGER := 10; -- Adjust batch size as needed
BEGIN
r_dequeue_options_.navigation := DBMS_AQ.FIRST_MESSAGE;
r_dequeue_options_.dequeue_mode := DBMS_AQ.REMOVE;
-- Condition to filter by partition_id
r_dequeue_options_.condition := 'tab.partition_id = ' || TO_CHAR(partition_id_);
-- Array dequeue
DBMS_AQ.DEQUEUE_ARRAY(
queue_name => IN_QUEUE_NAME,
dequeue_options => r_dequeue_options_,
message_properties => r_message_properties_,
payload => queue_payload_,
num_msgs => batch_size_,
msgid => v_message_handle_
);
COMMIT;
END Dequeue_Events_;
/
Questions & Clarification:Does setting
DBMS_AQ.FIRST_MESSAGE; override the lack of ordering guarantee in deq_condition?
Can we reliably expect FIFO ordering per partition even when using deq_condition?
If not, what is the best approach to ensure parallel processing per partition while preserving order?
Is there a more efficient way to implement partitioned parallel dequeuing without sharded queues?
Links:
deq_condition ordering not guaranteed -
https://docs.oracle.com/en/database/oracle/oracle-database/19/arpls/advanced-queuing-AQ-types.html#GUID-B11D312F-0EF5-4048-809B-630426E1E81A:~:text=dequeuing%20is%20undetermined.-,deq_condition,-A%20conditional%20expression First message navigation →
https://docs.oracle.com/en/database/oracle/oracle-database/19/arpls/advanced-queuing-AQ-types.html#GUID-B11D312F-0EF5-4048-809B-630426E1E81A:~:text=FIRST_MESSAGE_MULTI_GROUP