Using Advanced Queueing v4.0.10.2

EDB Postgres Advanced Server Advanced Queueing provides message queueing and message processing for the Advanced Server database. User-defined messages are stored in a queue; a collection of queues is stored in a queue table. You should first create a queue table before creating a queue that is dependent on it.

On the server side, procedures in the DBMS_AQADM package create and manage message queues and queue tables. Use the DBMS_AQ package to add or remove messages from a queue, or register or unregister a PL/SQL callback procedure. For more information about DBMS_AQ and DBMS_AQADM, click here.

On the client side, application uses EDB.NET driver to enqueue/dequeue message.

Enqueueing or Dequeueing a Message

For more information about using Advanced Servers Advanced Queueing functionality, see the Database Compatibility for Oracle Developers Built-in Package Guide.

Server-Side Setup

To use Advanced Queueing functionality on your .NET application, you must first create a user defined type, queue table, and queue, and then start the queue on the database server. Invoke EDB-PSQL and connect to the Advanced Server host database. Use the following SPL commands at the command line:

Creating a User-defined Type

To specify a RAW data type, you should create a user-defined type. The following example demonstrates creating a user-defined type named as myxml.

CREATE TYPE myxml AS (value XML);

Creating the Queue Table

A queue table can hold multiple queues with the same payload type. The following example demonstrates creating a table named MSG_QUEUE_TABLE.

EXEC DBMS_AQADM.CREATE_QUEUE_TABLE
      (queue_table => 'MSG_QUEUE_TABLE',
       queue_payload_type => 'myxml',
       comment => 'Message queue table');
END;

Creating the Queue

The following example demonstrates creating a queue named MSG_QUEUE within the table MSG_QUEUE_TABLE.

BEGIN
DBMS_AQADM.CREATE_QUEUE ( queue_name => 'MSG_QUEUE', queue_table => 'MSG_QUEUE_TABLE', comment => 'This queue contains pending messages.');
END;

Starting the Queue

Once the queue is created, invoke the following SPL code at the command line to start a queue in the EDB database.

BEGIN
DBMS_AQADM.START_QUEUE
(queue_name => 'MSG_QUEUE');
END;

Client-side Example

Once you have created a user-defined type, followed by queue table and queue, start the queue. Then, you can enqueue or dequeue a message using EDB .Net drivers.

Enqueue a message:

To enqueue a message on your .NET application, you must:

  1. Import the EnterpriseDB.EDBClient namespace.
  2. Pass the name of the queue and create the instance of the EDBAQQueue.
  3. Create the enqueue message and define a payload.
  4. Call the queue.Enqueue method.

The following code listing demonstrates using the queue.Enqueue method:

Note

The following code creates the message and serializes it. This is just an example code and is not going to compile if copied as it is. It is the responsibility of the user to serialize the message as XML.

using EnterpriseDB.EDBClient;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace AQXml
{
    class MyXML
    {
        public string value { get; set; }
    }
    class Program
    {
        static void Main(string[] args)
        {
            int messagesToSend = 1;
            if (args.Length > 0 && !string.IsNullOrEmpty(args[0]))
            {
                messagesToSend = int.Parse(args[0]);
            }
            for (int i = 0; i < 5; i++)
            {
                EnqueMsg("test message: " + i);
            }
        }

      private static EDBConnection GetConnection()
        {
            string connectionString = "Server=127.0.0.1;Host=127.0.0.1;Port=5444;User Id=enterprisedb;Password=test;Database=edb;Timeout=999";
            EDBConnection connection = new EDBConnection(connectionString);
            connection.Open();
            return connection;
        }


        private static string ByteArrayToString(byte[] byteArray)
        {
            // Sanity check if it's null so we don't incur overhead of an exception
            if (byteArray == null)
            {
                return string.Empty;
            }
            try
            {
                StringBuilder hex = new StringBuilder(byteArray.Length * 2);
                foreach (byte b in byteArray)
                {
                    hex.AppendFormat("{0:x2}", b);
                }

                return hex.ToString().ToUpper();
            }
            catch
            {
                return string.Empty;
            }
        }

        private static bool EnqueMsg(string msg)
        {
            EDBConnection con = GetConnection();
            using (EDBAQQueue queue = new EDBAQQueue("MSG_QUEUE", con))
            {
                queue.MessageType = EDBAQMessageType.Xml;
                EDBTransaction txn = queue.Connection.BeginTransaction();
                QueuedEntities.Message queuedMessage = new QueuedEntities.Message() { MessageText = msg };

                try
                {
                    string rootElementName = queuedMessage.GetType().Name;
                    if (rootElementName.IndexOf(".") != -1)
                    {
                        rootElementName = rootElementName.Split('.').Last();
                    }

                    string xml = new Utils.XmlFragmentSerializer<QueuedEntities.Message>().Serialize(queuedMessage);
                    EDBAQMessage queMsg = new EDBAQMessage();
                    queMsg.Payload = new MyXML { value = xml };
                    queue.MessageType = EDBAQMessageType.Udt;
                    queue.UdtTypeName = "myxml";
                    queue.Enqueue(queMsg);
                    var messageId = ByteArrayToString((byte[])queMsg.MessageId);
                    Console.WriteLine("MessageID: " + messageId);
                    txn.Commit();
                    queMsg = null;
                    xml = null;
                    rootElementName = null;
                    return true;
                }
                catch (Exception ex)
                {
                    txn?.Rollback();
                    Console.WriteLine("Failed to enqueue message.");
                    Console.WriteLine(ex.ToString());
                    return false;
                }
                finally
                {
                    queue?.Connection?.Dispose();
                }
            }
        }

    }
}

Dequeueing a message

To dequeue a message on your .NET application, you must:

  1. Import the EnterpriseDB.EDBClient namespace.
  2. Pass the name of the queue and create the instance of the EDBAQQueue.
  3. Call the queue.Dequeue method.
Note

The following code creates the message and serializes it. This is just an example code and is not going to compile if copied as it is. It is the responsibility of the user to serialize the message as XML.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using EnterpriseDB.EDBClient;

namespace DequeueXML
{
    class MyXML
    {
        public string value { get; set; }
    }
    class Program
    {
        static void Main(string[] args)
        {
            DequeMsg();
        }


        private static EDBConnection GetConnection()
        {
            string connectionString = "Server=127.0.0.1;Host=127.0.0.1;Port=5444;User Id=enterprisedb;Password=test;Database=edb;Timeout=999";
            EDBConnection connection = new EDBConnection(connectionString);
            connection.Open();
            return connection;
        }


        private static string ByteArrayToString(byte[] byteArray)
        {
            // Sanity check if it's null so we don't incur overhead of an exception
            if (byteArray == null)
            {
                return string.Empty;
            }
            try
            {
                StringBuilder hex = new StringBuilder(byteArray.Length * 2);
                foreach (byte b in byteArray)
                {
                    hex.AppendFormat("{0:x2}", b);
                }

                return hex.ToString().ToUpper();
            }
            catch
            {
                return string.Empty;
            }
        }
        public static void DequeMsg(int waitTime = 10)
        {
            EDBConnection con = GetConnection();
            using (EDBAQQueue queueListen = new EDBAQQueue("MSG_QUEUE", con))
            {
                queueListen.UdtTypeName = "myxml";
                queueListen.DequeueOptions.Navigation = EDBAQNavigationMode.FIRST_MESSAGE;
                queueListen.DequeueOptions.Visibility = EDBAQVisibility.ON_COMMIT;
                queueListen.DequeueOptions.Wait = 1;
                EDBTransaction txn = null;

                while (1 == 1)
                    {

                    if (queueListen.Connection.State == System.Data.ConnectionState.Closed)
                    {
                        queueListen.Connection.Open();
                    }

                    string messageId = "Unknown";
                    try
                    {
                        // the listen function is a blocking function. It will Wait the specified waitTime or until a
                        // message is received.
                        Console.WriteLine("Listening...");
                        string v = queueListen.Listen(null, waitTime);
                        // If we are waiting for a message and we specify a Wait time,
                        // then if there are no more messages, we want to just bounce out.
                        if (waitTime > -1 && v == null)
                        {
                            Console.WriteLine("No message received during Wait period.");
                            Console.WriteLine();
                            continue;
                        }

                        // once we're here that means a message has been detected in the queue. Let's deal with it.
                        txn = queueListen.Connection.BeginTransaction();

                        Console.WriteLine("Attempting to dequeue message...");
                        // dequeue the message
                        EDBAQMessage deqMsg;
                        try
                        {
                            deqMsg = queueListen.Dequeue();
                        }
                        catch (Exception ex)
                        {
                            if (ex.Message.Contains("ORA-25228"))
                            {
                                Console.WriteLine("Message was not there.  Another process must have picked it up.");
                                Console.WriteLine();
                                txn.Rollback();
                                continue;
                            }
                            else
                            {
                                throw;
                            }
                        }

                        messageId = ByteArrayToString((byte[])deqMsg.MessageId);
                        if (deqMsg != null)
                        {
                            Console.WriteLine("Processing received message...");
                            // process the message payload
                            MyXML obj = new MyXML();
                            queueListen.Map<MyXML>(deqMsg.Payload, obj);

                            QueuedEntities.Message msg = new Utils.XmlFragmentSerializer<QueuedEntities.Message>().Deserialize(obj.value);

                            Console.WriteLine("Received Message:");
                            Console.WriteLine("MessageID: " + messageId);
                            Console.WriteLine("Message: " + msg.MessageText);
                            Console.WriteLine("Enqueue Time" + queueListen.MessageProperties.EnqueueTime);

                            txn.Commit();

                            Console.WriteLine("Finished processing message");
                            Console.WriteLine();

                        }
                        else
                        {
                            Console.WriteLine("Message was not dequeued.");
                        }
                    }
                    catch (Exception ex)
                    {
                        Console.WriteLine("Failed To dequeue or process the dequeued message.");
                        Console.WriteLine(ex.ToString());
                        Console.WriteLine();
                        if (txn != null)
                        {
                            txn.Rollback();
                            if (txn != null)
                            {
                                txn.Dispose();
                            }
                        }
                    }
                }
            }

        }
    }
}

EDBAQ Classes

The following EDBAQ classes are used in this application:

EDBAQDequeueMode

The EDBAQDequeueMode class lists all the dequeuer modes available.

ValueDescription
BrowseRead the message without locking.
LockedReads and gets a write lock on the message.
RemoveDeletes the message after reading. This is the default value.
Remove_NoDataConfirms receipt of the message.

EDBAQDequeueOptions

The EDBAQDequeueOptions class lists the options available when dequeuing a message.

PropertyDescription
ConsumerNameThe name of the consumer for which to dequeue the message.
DequeueModeThis is set from EDBAQDequeueMode. It represents the locking behavior linked with the dequeue option.
NavigationThis is set from EDBAQNavigationMode. It represents the position of the message that will be fetched.
VisibilityThis is set from EDBAQVisibility. It represents whether the new message is dequeued or not as part of the current transaction.
WaitThe wait time for a message as per the search criteria.
MsgidThe message identifier.
CorrelationThe correlation identifier.
DeqConditionThe dequeuer condition. It is a Boolean expression.
TransformationThe transformation that will be applied before dequeuing the message.
DeliveryModeThe delivery mode of the dequeued message.

EDBAQEnqueueOptions

The EDBAQEnqueueOptions class lists the options available when enqueuing a message.

PropertyDescription
VisibilityThis is set from EDBAQVisibility. It represents whether the new message is enqueued or not as part of the current transaction.
RelativeMsgidThe relative message identifier.
SequenceDeviationThe sequence when the message should be dequeued.
TransformationThe transformation that will be applied before enqueuing the message.
DeliveryModeThe delivery mode of the enqueued message.

EDBAQMessage

The EDBAQMessage class lists a message to be enqueued/dequeued.

PropertyDescription
PayloadThe actual message to be queued.
MessageIdThe ID of the queued message.

EDBAQMessageProperties

The EDBAQMessageProperties lists the message properties available.

PropertyDescription
PriorityThe priority of the message.
DelayThe duration post which the message is available for dequeuing. This is specified in seconds.
ExpirationThe duration for which the message is available for dequeuing. This is specified in seconds.
CorrelationThe correlation identifier.
AttemptsThe number of attempts taken to dequeue the message.
RecipientListThe receipients list that overthrows the default queue subscribers.
ExceptionQueueThe name of the queue where the unprocessed messages should be moved.
EnqueueTimeThe time when the message was enqueued.
StateThe state of the message while dequeue.
OriginalMsgidThe message identifier in the last queue.
TransactionGroupThe transaction group for the dequeued messages.
DeliveryModeThe delivery mode of the dequeued message.

EDBAQMessageState

The EDBAQMessageState class represents the state of the message during dequeue.

ValueDescription
ExpiredThe message is moved to the exception queue.
ProcessedThe message is processed and kept.
ReadyThe message is ready to be processed.
WaitingThe message is in waiting state. The delay is not reached.

EDBAQMessageType

The EDBAQMessageType class represents the types for payload.

ValueDescription
RawThe raw message type.

Note: Currently, this payload type is not supported.
UDTThe user defined type message.
XMLThe XML type message.

Note: Currently, this payload type is not supported.

EDBAQNavigationMode

The EDBAQNavigationMode class represents the different types of navigation modes available.

ValueDescription
First_MessageReturns the first available message that matches the search terms.
Next_MessageReturns the next available message that matches the search items.
Next_TransactionReturns the first message of next transaction group.

EDBAQQueue

The EDBAQQueue class represents a SQL statement to execute DMBS_AQ functionality on a PostgreSQL database.

PropertyDescription
ConnectionThe connection to be used.
NameThe name of the queue.
MessageTypeThe message type that is enqueued/dequeued from this queue. For example EDBAQMessageType.Udt.
UdtTypeNameThe user defined type name of the message type.
EnqueueOptionsThe enqueue options to be used.
DequeuOptionsThe dequeue options to be used.
MessagePropertiesThe message properties to be used.

EDBAQVisibility

The EDBAQVisibility class represents the visibility options available.

ValueDescription
ImmediateThe enqueue/dequeue is not part of the ongoing transaction.
On_CommitThe enqueue/dequeue is part of the current transaction.
Note
  • To review the default options for the above parameters, click here.
  • EDB Advanced Queueing functionality uses user-defined types for calling enqueue/dequeue operations. Server Compatibility Mode=NoTypeLoading cannot be used with Advanced Queueing because NoTypeLoading will not load any user-defined types.