Using Advanced Queueing

EDB Postgres Advanced Server Advanced Queueing provides message queueing and message processing for the Advanced Server database. User-defined messages are stored in a queue; a collection of queues is stored in a queue table. You should first create a queue table before creating a queue that is dependent on it.

On the server side, procedures in the DBMS_AQADM package create and manage message queues and queue tables. Use the DBMS_AQ package to add or remove messages from a queue, or register or unregister a PL/SQL callback procedure. For more information about DBMS_AQ and DBMS_AQADM, click here.

On the client side, application uses EDB.NET driver to enqueue/dequeue message.

Enqueue or Dequeue a message

Serve-side setup

To use Advanced Queueing functionality on your .NET application, you must first create a user defined type, queue table, and queue, and then start the queue on the database server. Invoke EDB-PSQL and connect to the Advanced Server host database. Use the following SPL commands at the command line:

Creating user defined type

To specify a RAW data type, you should create a user-defined type. The following example demonstrates creating a user-defined type named as myxml.

CREATE TYPE myxml AS (value XML);

Creating the Queue table

A queue table can hold multiple queues with the same payload type. The following example demonstrates creating a table named MSG_QUEUE_TABLE.

EXEC DBMS_AQADM.CREATE_QUEUE_TABLE
      (queue_table => 'MSG_QUEUE_TABLE',
       queue_payload_type => 'myxml',
       comment => 'Message queue table');
END;

Creating Queue

The following example demonstrates creating a queue named MSG_QUEUE within the table MSG_QUEUE_TABLE.

BEGIN
DBMS_AQADM.CREATE_QUEUE ( queue_name => 'MSG_QUEUE', queue_table => 'MSG_QUEUE_TABLE', comment => 'This queue contains pending messages.');
END;

Starting Queue

Once the queue is created, invoke the following SPL code at the command line to start a queue in the EDB database.

BEGIN
DBMS_AQADM.START_QUEUE
(queue_name => 'MSG_QUEUE');
END;

Client-side sample

Once you have created user defined type, followed by queue table and queue, start the queue. Then, you can enqueue or dequeue a message using EDB .Net drivers.

Enqueue a message:

To enqueue a message on your .NET application, you must:

  1. Import the EnterpriseDB.EDBClient namespace.

  2. Pass the name of the queue and create the instance of the EDBAQQueue.

  3. Create the enqueue message and define payload.

  4. Call the queue.Enqueue method.

The following code listing demonstrates using the Queue.enqueue method:

using EnterpriseDB.EDBClient;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace AQXml
{
    class MyXML
    {
        public string value { get; set; }
    }
    class Program
    {
        static void Main(string[] args)
        {
            int messagesToSend = 1;
            if (args.Length > 0 && !string.IsNullOrEmpty(args[0]))
            {
                messagesToSend = int.Parse(args[0]);
            }
            for (int i = 0; i < 5; i++)
            {
                EnqueMsg("test message: " + i);
            }
        }

              private static EDBConnection GetConnection()
        {
            string connectionString = "Server=127.0.0.1;Host=127.0.0.1;Port=5444;User Id=enterprisedb;Password=test;Database=edb;Timeout=999";
            EDBConnection connection = new EDBConnection(connectionString);
            connection.Open();
            return connection;
        }


        private static string ByteArrayToString(byte[] byteArray)
        {
            // Sanity check if it's null so we don't incur overhead of an exception
            if (byteArray == null)
            {
                return string.Empty;
            }
            try
            {
                StringBuilder hex = new StringBuilder(byteArray.Length * 2);
                foreach (byte b in byteArray)
                {
                    hex.AppendFormat("{0:x2}", b);
                }

                return hex.ToString().ToUpper();
            }
            catch
            {
                return string.Empty;
            }
        }

        private static bool EnqueMsg(string msg)
        {
            EDBConnection con = GetConnection();
            using (EDBAQQueue queue = new EDBAQQueue("MSG_QUEUE", con))
            {
                queue.MessageType = EDBAQMessageType.Xml;
                EDBTransaction txn = queue.Connection.BeginTransaction();
                QueuedEntities.Message queuedMessage = new QueuedEntities.Message() { MessageText = msg };

                try
                {
                    string rootElementName = queuedMessage.GetType().Name;
                    if (rootElementName.IndexOf(".") != -1)
                    {
                        rootElementName = rootElementName.Split('.').Last();
                    }

                    string xml = new Utils.XmlFragmentSerializer<QueuedEntities.Message>().Serialize(queuedMessage);
                    EDBAQMessage queMsg = new EDBAQMessage();
                    queMsg.Payload = new MyXML { value = xml };
                    queue.MessageType = EDBAQMessageType.Udt;
                    queue.UdtTypeName = "myxml";
                    queue.Enqueue(queMsg);
                    var messageId = ByteArrayToString((byte[])queMsg.MessageId);
                    Console.WriteLine("MessageID: " + messageId);
                    txn.Commit();
                    queMsg = null;
                    xml = null;
                    rootElementName = null;
                    return true;
                }
                catch (Exception ex)
                {
                    txn?.Rollback();
                    Console.WriteLine("Failed to enqueue message.");
                    Console.WriteLine(ex.ToString());
                    return false;
                }
                finally
                {
                    queue?.Connection?.Dispose();
                }
            }
        }

    }
}

Dequeue a message

To dequeue a message on your .NET application, you must:

  1. Import the EnterpriseDB.EDBClient namespace.

  2. Pass the name of the queue and create the instance of the EDBAQQueue.

  3. Call the queue.Dequeue method.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using EnterpriseDB.EDBClient;

namespace DequeueXML
{
    class MyXML
    {
        public string value { get; set; }
    }
    class Program
    {
        static void Main(string[] args)
        {
            DequeMsg();
        }


        private static EDBConnection GetConnection()
        {
            string connectionString = "Server=127.0.0.1;Host=127.0.0.1;Port=5444;User Id=enterprisedb;Password=test;Database=edb;Timeout=999";
            EDBConnection connection = new EDBConnection(connectionString);
            connection.Open();
            return connection;
        }


        private static string ByteArrayToString(byte[] byteArray)
        {
            // Sanity check if it's null so we don't incur overhead of an exception
            if (byteArray == null)
            {
                return string.Empty;
            }
            try
            {
                StringBuilder hex = new StringBuilder(byteArray.Length * 2);
                foreach (byte b in byteArray)
                {
                    hex.AppendFormat("{0:x2}", b);
                }

                return hex.ToString().ToUpper();
            }
            catch
            {
                return string.Empty;
            }
        }
        public static void DequeMsg(int waitTime = 10)
        {
            EDBConnection con = GetConnection();
            using (EDBAQQueue queueListen = new EDBAQQueue("MSG_QUEUE", con))
            {
                queueListen.UdtTypeName = "myxml";
                queueListen.DequeueOptions.Navigation = EDBAQNavigationMode.FIRST_MESSAGE;
                queueListen.DequeueOptions.Visibility = EDBAQVisibility.ON_COMMIT;
                queueListen.DequeueOptions.Wait = 1;
                EDBTransaction txn = null;

                while (1 == 1)
                    {

                    if (queueListen.Connection.State == System.Data.ConnectionState.Closed)
                    {
                        queueListen.Connection.Open();
                    }

                    string messageId = "Unknown";
                    try
                    {
                        // the listen function is a blocking function. It will Wait the specified waitTime or until a
                        // message is received.
                        Console.WriteLine("Listening...");
                        string v = queueListen.Listen(null, waitTime);
                        // If we are waiting for a message and we specify a Wait time,
                        // then if there are no more messages, we want to just bounce out.
                        if (waitTime > -1 && v == null)
                        {
                            Console.WriteLine("No message received during Wait period.");
                            Console.WriteLine();
                            continue;
                        }

                        // once we're here that means a message has been detected in the queue. Let's deal with it.
                        txn = queueListen.Connection.BeginTransaction();

                        Console.WriteLine("Attempting to dequeue message...");
                        // dequeue the message
                        EDBAQMessage deqMsg;
                        try
                        {
                            deqMsg = queueListen.Dequeue();
                        }
                        catch (Exception ex)
                        {
                            if (ex.Message.Contains("ORA-25228"))
                            {
                                Console.WriteLine("Message was not there.  Another process must have picked it up.");
                                Console.WriteLine();
                                txn.Rollback();
                                continue;
                            }
                            else
                            {
                                throw;
                            }
                        }

                        messageId = ByteArrayToString((byte[])deqMsg.MessageId);
                        if (deqMsg != null)
                        {
                            Console.WriteLine("Processing received message...");
                            // process the message payload
                            MyXML obj = new MyXML();
                            queueListen.Map<MyXML>(deqMsg.Payload, obj);

                            QueuedEntities.Message msg = new Utils.XmlFragmentSerializer<QueuedEntities.Message>().Deserialize(obj.value);

                            Console.WriteLine("Received Message:");
                            Console.WriteLine("MessageID: " + messageId);
                            Console.WriteLine("Message: " + msg.MessageText);
                            Console.WriteLine("Enqueue Time" + queueListen.MessageProperties.EnqueueTime);

                            txn.Commit();

                            Console.WriteLine("Finished processing message");
                            Console.WriteLine();

                        }
                        else
                        {
                            Console.WriteLine("Message was not dequeued.");
                        }
                    }
                    catch (Exception ex)
                    {
                        Console.WriteLine("Failed To dequeue or process the dequeued message.");
                        Console.WriteLine(ex.ToString());
                        Console.WriteLine();
                        if (txn != null)
                        {
                            txn.Rollback();
                            if (txn != null)
                            {
                                txn.Dispose();
                            }
                        }
                    }
                }
            }

        }
    }
}

EDBAQ Classes

The following EDBAQ classes are used in this application:

EDBAQDequeueMode

The EDBAQDequeueMode class lists all the dequeuer modes available.

Value

Description

Browse

Read the message without locking.

Locked

Reads and gets a write lock on the message.

Remove

Deletes the message after reading. This is the default value.

Remove_NoData

Confirms receipt of the message.

EDBAQDequeueOptions

The EDBAQDequeueOptions class lists the options available when dequeuing a message.

Property

Description

ConsumerName

The name of the consumer for which to dequeue the message.

DequeueMode

This is set from EDBAQDequeueMode. It represents the locking behavior linked with the dequeue option.

Navigation

This is set from EDBAQNavigationMode. It represents the position of the message that will be fetched.

Visibility

This is set from EDBAQVisibility. It represents whether the new message is dequeued or not as part of the current transaction.

Wait

The wait time for a message as per the search criteria.

Msgid

The message identifier.

Correlation

The correlation identifier.

DeqCondition

The dequeuer condition. It is a Boolean expression.

Transformation

The transformation that will be applied before dequeuing the message.

DeliveryMode

The delivery mode of the dequeued message.

EDBAQEnqueueOptions

The EDBAQEnqueueOptions class lists the options available when enqueuing a message.

Property

Description

Visibility

This is set from EDBAQVisibility. It represents whether the new message is enqueued or not as part of the current transaction.

RelativeMsgid

The relative message identifier.

SequenceDeviation

The sequence when the message should be dequeued.

Transformation

The transformation that will be applied before enqueuing the message.

DeliveryMode

The delivery mode of the enqueued message.

EDBAQMessage

The EDBAQMessage class lists a message to be enqueued/dequeued.

Property

Description

Payload

The actual message to be queued.

MessageId

The ID of the queued message.

EDBAQMessageProperties

The EDBAQMessageProperties lists the message properties available.

Property

Description

Priority

The priority of the message.

Delay

The duration post which the message is available for dequeuing. This is specified in seconds.

Expiration

The duration for which the message is available for dequeuing. This is specified in seconds.

Correlation

The correlation identifier.

Attempts

The number of attempts taken to dequeue the message.

RecipientList

The receipients list that overthrows the default queue subscribers.

ExceptionQueue

The name of the queue where the unprocessed messages should be moved.

EnqueueTime

The time when the message was enqueued.

State

The state of the message while dequeue.

OriginalMsgid

The message identifier in the last queue.

TransactionGroup

The transaction group for the dequeued messages.

DeliveryMode

The delivery mode of the dequeued message.

EDBAQMessageState

The EDBAQMessageState class represents the state of the message during dequeue.

Value

Description

Expired

The message is moved to the exception queue.

Processed

The message is processed and kept.

Ready

The message is ready to be processed.

Waiting

The message is in waiting state. The delay is not reached.

EDBAQMessageType

The EDBAQMessageType class represents the types for payload.

Value

Description

Raw

The raw message type.

Note: Currently, this payload type is not supported.

UDT

The user defined type message.

XML

The XML type message.

Note: Currently, this payload type is not supported.

EDBAQNavigationMode

The EDBAQNavigationMode class represents the different types of navigation modes available.

Value

Description

First_Message

Returns the first available message that matches the search terms.

Next_Message

Returns the next available message that matches the search items.

Next_Transaction

Returns the first message of next transaction group.

EDBAQQueue

The EDBAQQueue class represents a SQL statement to execute DMBS_AQ functionality on a PostgreSQL database.

Property

Description

Connection

The connection to be used.

Name

The name of the queue.

MessageType

The message type that is enqueued/dequeued from this queue. For example EDBAQMessageType.Udt.

UdtTypeName

The user defined type name of the message type.

EnqueueOptions

The enqueue options to be used.

DequeuOptions

The dequeue options to be used.

MessageProperties

The message properties to be used.

EDBAQVisibility

The EDBAQVisibility class represents the visibility options available.

Value

Description

Immediate

The enqueue/dequeue is not part of the ongoing transaction.

On_Commit

The enqueue/dequeue is part of the current transaction.

Note

  • To review the default options for the above parameters, click here.

  • EDBAQ functionality uses user defined types for calling enqueue/dequeue operations. Server Compatibility Mode=NoTypeLoading cannot be used with EDBAQ because NoTypeLoading will not load any user defined types.