Quickstart for Cloud Queues#

Cloud Queues is an open source, scalable, and highly available message and notifications service, based on the OpenStack Marconi project. Cloud Queues uses a few basic components–queues, messages, and claims–which give you the flexibility you need to create powerful web applications in the cloud.

This service supports a variety of messaging patterns, such as the producer-consumer model and the publisher-subscriber model, through which you create and manage queues and messages.

  • In the producer-consumer model, you create queues in which producers, or servers, can post messages. Workers, or consumers, can then claim those messages and delete them after they complete the actions associated with the messages. A single claim can contain multiple messages, and administrators can query claims for status. This pattern is ideal for dispatching jobs to multiple processors.

  • In the publisher-subscriber model, the publisher sends a message to the queue. All subscribers (or workers) listen for messages in the queue, but they do not claim them. Multiple subscribers can work on a message. Messages are eventually deleted based on their time to live (TTL) value. This pattern is ideal for notification of events to multiple workers at once.

Concepts#

To use this service effectively, you should understand how these key ideas are used in this context:

claim

The process of a worker checking out a message to perform a task. Claiming a message prevents other workers from attempting to process the same message.

message

A task, a notification, or any meaningful data that a producer or publisher sends to the queue.

queue

The entity that holds messages. Ideally, a queue is created per work type. For example, if you want to compress files, you would create a queue dedicated to files awaiting compression.

Authentication#

To use this service you have to authenticate first. To do this, you will need your Rackspace username and API key. Your username is the one you use to login to the Cloud Control Panel at http://mycloud.rackspace.com/.

To find your API key, use the instructions in View and reset your API key.

You can specify a default region. Here is a list of available regions:

  • DFW (Dallas-Fort Worth, TX, US)

  • HKG (Hong Kong, China)

  • IAD (Blacksburg, VA, US)

  • LON (London, England)

  • SYD (Sydney, Australia)

Some users have access to another region in ORD (Chicago, IL). New users will not have this region.

Once you have these pieces of information, you can pass them into the SDK by replacing {username}, {apiKey}, and {region} with your info:

CloudIdentity cloudIdentity = new CloudIdentity()
{
    APIKey = "{apikey}",
    Username = "{username}"
};
// Create the provider as well; it'll be used in every method
CloudQueuesProvider cloudQueuesProvider =
      new CloudQueuesProvider(
              cloudIdentity,
              "{region}",
              Guid.NewGuid(),
              false,
              null);
// Not currently supported by this SDK
// Authentication in jclouds is lazy and happens on the first call to the cloud.
// Though the project name changed to Zaqar, use the original Marconi here.
MarconiApi marconiApi = ContextBuilder.newBuilder("rackspace-cloudqueues-us")
    .credentials("{username}", "{apiKey}")
    .buildApi(MarconiApi.class);
// Not currently supported by this SDK
require 'vendor/autoload.php';

use OpenCloud\Rackspace;

// Instantiate a Rackspace client.
$client = new Rackspace(Rackspace::US_IDENTITY_ENDPOINT, array(
    'username' => '{username}',
    'apiKey'   => '{apiKey}'
));
import pyrax
# for queues, we also need to generate a client ID
import uuid
my_client_id = str(uuid.uuid4())

pyrax.set_setting("identity_type", "rackspace")
pyrax.set_default_region('{region}')
pyrax.set_credentials('{username}', '{apiKey}')
@client = Fog::Rackspace::Queues.new(
  :rackspace_username => '{username}',
  :rackspace_api_key => '{apikey}',
  :rackspace_region => '{region}'
)
# {username}, {apiKey} below are placeholders, do not enclose '{}' when you replace them with actual credentials.

curl -s https://identity.api.rackspacecloud.com/v2.0/tokens -X 'POST' \
  -d '{"auth":{"RAX-KSKEY:apiKeyCredentials":{"username":"{username}", "apiKey":"{apiKey}"}}}' \
  -H "Content-Type: application/json" | python -m json.tool

# From the resulting json, set three environment variables: TENANT, TOKEN and ENDPOINT.

export TENANT="{tenantId}"
export TOKEN="{tokenId}"
export ENDPOINT="{publicUrl}" # For the cloud queues service

Use the API#

Some of the basic operations you can perform with this API are described below.

Create queue#

To create a queue:

QueueName queueName = new QueueName("{queue_name}");
bool created = await cloudQueuesProvider.CreateQueueAsync(queueName, CancellationToken.None);
await cloudQueuesProvider.DeleteQueueAsync(queueName, CancellationToken.None);
// Not currently supported by this SDK
QueueApi queueApi = marconiApi.getQueueApi("{region}", "{clientId}");

queueApi.create("{queueName}");
// Not currently supported by this SDK
// Obtain an Object Store service object from the client.
$queuesService = $client->queuesService(null, '{regionId}');

// You must set a unique client ID for every script that accesses the API
// This enforces responsibility when consuming and processing messages. If
// you do not want to set your own UUID, leave the arg empty and the SDK will
// create a default one:
$queuesService->setClientId();

$queue = $queuesService->createQueue('sample_queue');
queue = pyrax.queues.create("sample_queue")
queue = @client.queues.create(:name => 'sample_queue')
curl -X PUT $ENDPOINT/queues/{queueName} \
  -H "X-Auth-Token: $TOKEN" \
  -H "Accept: application/json" \
  -H "X-Project-Id: {projectId}"

List queues#

To list all the queues in a given region:

ReadOnlyCollectionPage<CloudQueue> queueList =
      await cloudQueuesProvider.ListQueuesAsync(null, null, true, CancellationToken.None);
// Not currently supported by this SDK
QueueApi queueApi = marconiApi.getQueueApi("{region}", "{clientId}");

List<Queue> queues = queueApi.list(true).concat().toList();
// Not currently supported by this SDK
$queues = $queuesService->listQueues();
pyrax.queues.list()
@client.queues.all
curl -X GET $ENDPOINT/queues \
  -H "Content-type: application/json" \
  -H "X-Auth-Token: $TOKEN" \
  -H "Accept: application/json" \
  -H "X-Project-Id: {projectId}"

Delete queue#

To delete a queue:

QueueName queueName = new QueueName("{queue_name}");
await cloudQueuesProvider.DeleteQueueAsync(queueName, CancellationToken.None);
// Not currently supported by this SDK
QueueApi queueApi = marconiApi.getQueueApi("{region}", "{clientId}");

queueApi.delete("{queueName}");
// Not currently supported by this SDK
$queue->delete();
pyrax.queues.delete("sample_queue")
queue.destroy
curl -X DELETE $ENDPOINT/queues/{queueName} \
  -H "Content-type: application/json" \
  -H "X-Auth-Token: $TOKEN" \
  -H "Accept: application/json" \
  -H "X-Project-Id: {projectId}"

Warning: Deleting a queue also deletes all messages within it.

Post message#

To post a message to a queue used by the consumer of the message:

QueueName queueName = new QueueName("{queue_name}");
TimeSpan ttl = TimeSpan.FromMinutes(900);
Newtonsoft.Json.Linq.JObject message_body =
     new Newtonsoft.Json.Linq.JObject("{\"play\": \"hockey\"}");
Message message = new Message(ttl, message_body);
Message[] messages = { message };
await cloudQueuesProvider.PostMessagesAsync(queueName, CancellationToken.None, messages);
// Not currently supported by this SDK
MessageApi messageApi =
    marconiApi.getMessageApi("{region}", "{clientId}", "{queueName}");

CreateMessage createMessage = CreateMessage.builder()
        .ttl(900)
        .body("{\"play\": \"hockey\"}")
        .build();
List<CreateMessage> createMessages = ImmutableList.of(createMessage);

MessagesCreated messagesCreated = messageApi.create(createMessages);
// Not currently supported by this SDK
// Post a message to the queue with a life of 900 seconds.
$queue->createMessage(array(
    'body' => 'Message body',
    'ttl'  => 900
));
#assign generated client ID (see authentication section)
pyrax.queues.client_id = my_client_id

queue = pyrax.queues.get("sample_queue")

# The 'body' parameter can be a simple value, or a chunk of XML, or pretty
#   much anything you need.
# The 'ttl' parameter sets the life of the message (in seconds).

queue.post_message("Message body", ttl=900)
# The 'body' parameter can be a String, a Hash, or an Array.
# The 'ttl' parameter sets the life of the message (in seconds).

queue.enqueue("Message body", 900)
curl -X POST $ENDPOINT/queues/{queueName}/messages -d \
  '[{"ttl": 300,"body": {"event": "BackupStarted"}},{"ttl": 60,"body": {"play": "hockey"}}]' \
  -H "Content-type: application/json" \
  -H "Client-ID: {clientId}" \
  -H "X-Auth-TOKEN: $TOKEN" \
  -H "Accept: application/json" \
  -H "X-Project-Id: {projectId}"

Claim messages#

To claim a message or messages from a queue used by the consumer of the messages:

QueueName queueName = new QueueName("{queue_name}");
int limit = 4;
TimeSpan ttl = TimeSpan.FromMinutes(900);
TimeSpan grace = TimeSpan.FromMinutes(120)
Claim claim = await cloudQueuesProvider.ClaimMessageAsync(
      queueName,
      limit ,
      ttl,
      grace,
      CancellationToken.None);
// Not currently supported by this SDK
ClaimApi claimApi =
    marconiApi.getClaimApi("{region}", "{clientId}", "{queueName}");

List<Message> messages = claimApi.claim(900, 120, 4);
// Not currently supported by this SDK
// Claim the 4 oldest messages in the queue. Keep the claim for 900 seconds (i.e. the claim TTL). Extend the life
// of these messages to 120 seconds past the claim TTL.
$claimedMessages = $queue->claimMessages(array(
    'limit' => 4,
    'ttl'   => 900,
    'grace' => 120
));
# Claims require the following parameters:
#   ttl: Defines how long the claim lasts (in seconds) before it is released.
#   grace: Defines how long (in seconds) to extend the life of a message to
#          match the life of the claim.
#   count: Optional. The number of messages to claim (up to 20 max).
#          Default = 10.
queue = pyrax.queues.get("sample_queue")
claim = queue.claim_messages(ttl=900, grace=120, count=4)
# Claims require the following parameters:
#   limit: The number of messages to claim.
#   ttl:   Defines how long the claim lasts (in seconds) before it is released.
#   grace: Defines how long (in seconds) to extend the life of a message to
#          match the life of the claim.
claim = queue.claims.create(
  :limit => 1,
  :ttl => 900,
  :grace => 600
)
curl -X POST $ENDPOINT/queues/{queueName}/claims -d \
  '{"ttl": 300,"grace":300}' \
  -H "Content-type: application/json" \
  -H "Client-ID: {clientId}" \
  -H "X-Auth-Token: $TOKEN" \
  -H "Accept: application/json" \
  -H "X-Project-Id: {projectId}"

Release claimed messages#

To release a claimed message so that a different consumer can attempt to process the message, if the consumer cannot complete the task specified in a message or simply refuses to do so:

QueueName queueName = new QueueName("{queue_name}");
TimeSpan ttl = TimeSpan.FromMinutes(900);
TimeSpan grace = TimeSpan.FromMinutes(60);
Claim claim = await cloudQueuesProvider.ClaimMessageAsync(
      queueName,
      null,
      ttl,
      grace,
      CancellationToken.None);
await cloudQueuesProvider.ReleaseClaimAsync(queueName, claim, CancellationToken.None);
// Not currently supported by this SDK
ClaimApi claimApi =
    marconiApi.getClaimApi("{region}", "{clientId}", "{queueName}");

claimApi.release("{claimId}");
// Not currently supported by this SDK
foreach ($claimedMessages as $claimedMessage) {
   $claimId = $claimedMessage->getClaimIdFromHref();
   $claim   = $queue->getClaim($claimId);
   $claim->delete();
}
queue = pyrax.queues.get("sample_queue")
queue.release_claim(claim_id)
claim.destroy
curl -X DELETE $ENDPOINT/queues/{queueName}/claims/{claimId} \
  -H "Content-type: application/json" \
  -H "X-Auth-Token: $TOKEN" \
  -H "Client-ID: {clientId}"  \
  -H "Accept: application/json" \
  -H "X-Project-Id: {projectId}"

Delete messages#

To delete a message after it has been used or completed and is no longer needed, avoiding duplicate work by other consumers of the queue:

QueueName queueName = new QueueName("{queue_name}");
MessageId messageId = new MessageId("message_id");
await cloudQueuesProvider.DeleteMessageAsync(queueName, messageId, null, CancellationToken.None);
// Not currently supported by this SDK
MessageApi messageApi =
    marconiApi.getMessageApi("{region}", "{clientId}", "{queueName}");

List<String> messageIds = ImmutableList.of("{messageId}");

messageApi.delete(messageIds);
// Not currently supported by this SDK
// Delete claimed messages.
foreach ($claimedMessages as $claimedMessage) {
   $claimId = $claimedMessage->getClaimIdFromHref();
   $claimedMessage->delete($claimId);
}
queue = pyrax.queues.get("sample_queue")

# If the message has not been claimed:
queue.delete_message(message_id)

# If the message has been claimed, you need to include the claim_id:
queue.delete_message(message_id, claim_id="01234356789abcdef")
message.destroy
curl -X DELETE $ENDPOINT/queues/{queueName}/messages/{messageId} \
  -H "Content-type: application/json" \
  -H "X-Auth-Token: $TOKEN" \
  -H "Client-ID: {clientId}" \
  -H "Accept: application/json" \
  -H "X-Project-Id: {projectId}"

More information#

This quickstart is intentionally brief, demonstrating only a few basic operations. To learn more about interacting with Rackspace cloud services, explore the following sites: