Queue Splitting

Sharing queues by splitting messages between multiple clients and the cluster

If your application consumes messages from a queue service, you should choose a configuration that matches your intention:

  1. Running your application with mirrord without any special configuration will result in your local application competing with the deployed application (and potentially other mirrord runs by teammates) for queue messages.

  2. Running your application with copy_target + scale_down will result in the deployed application not consuming any messages, and your local application being the exclusive consumer of queue messages.

  3. If you want to control which messages will be consumed by the deployed application, and which ones will reach your local application, set up queue splitting for the relevant target, and define a messages filter in the mirrord configuration. Messages that match the filter will reach your local application, and messages that do not, will reach either the deployed application, or another teammate's local application, if they match their filter.

This feature is only available for users on the Team and Enterprise pricing plans.

Queue splitting is currently available for Amazon SQS and Kafka. Pretty soon we'll support RabbitMQ as well. The word "queue" in this doc is used to also refer to "topic" in the context of Kafka.

How It Works

When a queue splitting session starts, the mirrord operator patches the target workload (e.g. deployment or rollout) to consume messages from a different, temporary queue. That temporary queue is exclusive to the target workload. Similarly, the local application is reconfigured to consume messages from its own exclusive temporary queue.

Once all temporary queues are prepared, the mirrord operator starts consuming messages from the original queue, and publishing them to one of the temporary queues, based on message filters provided by the users in their mirrord configs. This routing is based on message filters provided by the users in their mirrord configs.

First, we have a consumer app reading messages from an SQS queue:

A K8s application that consumes messages from an SQS queue

When the first mirrord SQS splitting session starts, two temporary queues are created (one for the target deployed in the cluster, one for the user's local application), and the mirrord operator routes messages according to the user's filter:

One SQS splitting session

If a second user then starts a mirrord SQS splitting session on the same queue, a the third temporary queue is created (for the second user's local application). The mirrord operator includes the new queue and the second user's filter in the routing logic.

Two SQS splitting sessions

If the filters defined by the two users both match some message, one of the users will receive the message at random.

Temporary queues are managed by the mirrord operator and garbage collected in the background. After all queue splitting sessions end, the operator promptly deletes the allocated resources.

Plese note that:

  1. Temporary queues created for the deployed targets will not be deleted as long as there are any targets' pods that use them.

  2. In case of SQS splitting, deployed targets will keep reading from the temporary queues as long as their temporary queues have unconsumed messages.

Enabling Queue Splitting in Your Cluster

1

Enable SQS splitting in the Helm chart

Enable the operator.sqsSplitting setting in the mirrord-operator Helm chart.

2

Authenticate and authorize the mirrord operator

The mirrord operator will need to be able to perform operations on the SQS queues. To do this, it will build an SQS client, using the default credentials provider chain.

The easiest way to provide the crendentials for the operator is with IAM role assumption. For that, an IAM role with an appropriate policy has to be assigned to the operator's service account. Please follow AWS's documentation on how to do that. Note that operator's service account can be annotated with the IAM role's ARN with the sa.roleArn setting in the mirrord-operator Helm chart.

Some of the SQS permissions are needed for your actual queues that you would like to split, and some permissions are only needed for the temporary queues, managed by the operator. Here is an overview:

SQS Permission
needed for your queues
needed for temporary queues

GetQueueUrl

ListQueueTags

ReceiveMessage

DeleteMessage

GetQueueAttributes

✓ (both!)

CreateQueue

TagQueue

SendMessage

DeleteQueue

And here we provide a short explanation for each required permission:

  • sqs:GetQueueUrl: the operator finds queue names to split in the provided source, and then it fetches the URL from SQS in order to make other API calls.

  • sqs:GetQueueAttributes: the operator queries your queue's attributes, in order to clone these attributes to all derived temporary queues. It also reads the attributes of the temporary queues, in order to check the number of remaining messages.

  • sqs:ListQueueTags: the operator queries your queue's tags, in order to clone these tags to all derived temporary queues.

  • sqs:ReceiveMessage: the operator reads messages from the queues you split.

  • sqs:DeleteMessage: after reading a message and forwarding it to a temporary queue, the operator deletes the message from the split queue.

  • sqs:CreateQueue: the operator creates temporary queues in your SQS account.

  • sqs:TagQueue: the operator sets tags on the temporary queues.

  • sqs:SendMessage: the operator sends messages to the temporary queues.

  • sqs:DeleteQueue: the operator deletes stale temporary queues in the background.

This is an example for a policy that gives the operator's roles the minimal permissions it needs to split a queue called ClientUploads:

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "sqs:GetQueueUrl",
        "sqs:GetQueueAttributes",
        "sqs:ListQueueTags",
        "sqs:ReceiveMessage",
        "sqs:DeleteMessage"
      ],
      "Resource": [
        "arn:aws:sqs:eu-north-1:314159265359:ClientUploads"
      ]
    },
    {
      "Effect": "Allow",
      "Action": [
        "sqs:CreateQueue",
        "sqs:TagQueue",
        "sqs:SendMessage",
        "sqs:GetQueueAttributes",
        "sqs:DeleteQueue"
      ],
      "Resource": "arn:aws:sqs:eu-north-1:314159265359:mirrord-*"
    }
  ]
}
  • The first statement gives the role the permissions it needs for your original queues.

    Instead of specifying the queues you would like to be able to split in the first statement, you could alternatively make that statement apply for all resources in the account, and limit the queues it applies to using conditions instead of resource names. For example, you could add a condition that makes the statement only apply to queues with the tag splittable=true or env=dev etc. and set those tags for all queues you would like to allow the operator to split.

  • The second statement in the example gives the role the permissions it needs for the temporary queues. Since all the temporary queues created by mirrord are created with the name prefix mirrord-, that statement in the example is limited to resources with that prefix in their name.

    If you would like to limit the second statement with conditions instead of (only) with the resource name, you can set a condition that requires a tag, and in the MirrordWorkloadQueueRegistry resource you can specify for each queue tags that mirrord will set for temporary queues that it creates for that original queue (see relevant section).

If the queue messages are encrypted, the operator's IAM role should also have the following permissions:

  • kms:Encrypt

  • kms:Decrypt

  • kms:GenerateDataKey

3

Authorize deployed consumers

In order to be targeted with SQS splitting, a deployed consumer must be able to use the temporary queues created by mirrord. E.g. if the consumer application retrieves the queue's URL based on its name, lists queue's tags, consumes and deletes messages from the queue — it must be able to do the same on a temporary queue.

Any temporary queues managed by mirrord are created with the same policy as the original queues they are splitting (with the single change of updating the queue name in the policy). Therefore, access control based on SQS policies should automatically be taken care of.

However, if the consumer's access to the queue is controlled by an IAM policy (and not an SQS policy, see SQS docs), you will need to adjust it.

4

Provide application context

On operator installation with operator.sqsSplitting enabled, a new CustomResource type is defined in your cluster — MirrordWorkloadQueueRegistry. Users with permissions to get CRDs can verify its existence with kubectl get crd mirrordworkloadqueueregistries.queues.mirrord.metalbear.co. Before you can run sessions with SQS splitting, you must create a queue registry for the desired target. This is because the queue registry contains additional application context required by the mirrord operator. For example, the operator needs to know which environment variables contain the names of the SQS queues to split.

See an example queue registry defined for a deployment meme-app living in namespace meme:

apiVersion: queues.mirrord.metalbear.co/v1alpha
kind: MirrordWorkloadQueueRegistry
metadata:
  name: meme-app-q-registry
  namespace: meme
spec:
  consumer:
    name: meme-app
    workloadType: Deployment
    container: main
  queues:
    meme-queue:
      queueType: SQS
      nameSource:
        envVar: INCOMING_MEME_QUEUE_NAME
      tags:
        tool: mirrord
    ad-queue:
      queueType: SQS
      nameSource:
        envVar: AD_QUEUE_NAME
      tags:
        tool: mirrord

The registry above says that:

  1. It provides context for container main running in deployment meme-app in namespace meme.

  2. The container consumes two SQS queues. Their names are read from environment variables INCOMING_MEME_QUEUE_NAME and AD_QUEUE_NAME.

  3. The SQS queues can be referenced in a mirrord config under IDs meme-queue and ad-queue, respectively.

  4. When creating a temporary queue derived from either of the two queues, mirrord operator should add the tag tool=mirrord.

Link the registry to the deployed consumer

The queue registry is a namespaced resource, so it can only reference a consumer deployed in the same namespace. The reference is specified with spec.consumer:

  • name — name of the Kubernetes workload of the deployed consumer.

  • workloadType — type of the Kubernetes workload of the deployed consumer. Right now only consumers deployed in deployments and rollouts are supported.

  • container — name of the exact container running in the workload. This field is optional. If you omit it, the registry will reference all of the workload's containers.

Desribe consumed queues in the registry

The queue registry describes SQS queues consumed by the referenced consumer. The queues are described in entries of the spec.queues object.

The entry's key can be arbitrary, as it will only be referenced from the user's mirrord config.

The entry's value is an object describing single or multiple SQS queues consumed by the workload:

  • nameSource describes which environment variables contain names/URLs of the consumed queues. Either envVar or regexPattern field is required.

    • envVar stores a name of a single environment variables.

    • regexPattern selects multiple environment variables based on a regular expression.

  • fallbackName stores an optional fallback name/URL, in case nameSource is not found in the workload spec. nameSource will still be used to inject the name/URL of the temporary queue.

  • namesFromJsonMap specifies how to process the values of environment variables that contain queue names/URLs. If set to true, values of all variables of will be parsed as JSON objects with string values. All values in these objects will be treated as queue names/URLs. If set to false, values of all variables will be treated directly as queue names/URLs. Defaults to false.

  • tags specifies additional tags to be set on all created temporary queues.

  • sns specifies whether the queues contains SQS messages created from SNS notifications. If set to true, message bodies will be parsed and matched against users' filters, as SNS notification attributes are found in the SQS message body. If set to false, message attributes will be used matched against users' filters. Defaults to false.

Setting a Filter for a mirrord Run

Once cluster setup is done, mirrord users can start running sessions with queue message filters in their mirrord configuration files. feature.split_queues is the configuration field they need to specify in order to filter queue messages. Directly under it, mirrord expects a mapping from a queue or queue ID to a queue filter definition.

Filter definition contains two fields:

  • queue_typeSQS or Kafka

  • message_filter — mapping from message attribute (SQS) or header (Kafka) name to a regex for its value. The local application will only see queue messages that have all of the specified message attributes/headers.

An empty message_filter is treated as a match-none directive.

See example configurations below:

{
  "operator": true,
  "target": "deployment/meme-app/container/main",
  "feature": {
    "split_queues": {
      "meme-queue": {
        "queue_type": "SQS",
        "message_filter": {
          "author": "^me$",
          "level": "^(beginner|intermediate)$"
        }
      },
      "ad-queue": {
        "queue_type": "SQS",
        "message_filter": {}
      },
      "views-topic": {
        "queue_type": "Kafka",
        "message_filter": {
          "author": "^me$",
          "source": "^my-session-"
        }
      }
    }
  }
}

In the example above, the local application:

  • Will receive a subset of messages from SQS queues desribed in the registry under ID meme-queue. All received messages will have an attribute author with the value me, AND an attribute level with value either beginner or intermediate.

  • Will receive no messages from SQS queues described in the registry under ID ad-queue.

  • Will receive a subset of messages from Kafka queue with ID views-topic. All received messages will have an attribute author with the value me, AND an attribute source with value starting with my-session- (e.g my-session-844cb78789-2fmsw).

FAQ

How do I authenticate operator's Kafka client with an SSL certificate?

An example MirrordKafkaClientConfig would look as follows:

apiVersion: queues.mirrord.metalbear.co/v1alpha
kind: MirrordKafkaClientConfig
metadata:
  name: ssl-auth
  namespace: mirrord
spec:
  properties:
  # Contents of the PEM file with client certificate.
  - name: ssl.certificate.pem
    value: "..."
  # Contents of the PEM file with client private key.
  - name: ssl.key.pem
    value: "..."
  # Contents of the PEM file with CA.
  - name: ssl.ca.pem
    value: "..."
  # Password for the client private key (if password protected).
  - name: ssl.key.password
    value: "..."

Alternatively, you can store the credentials in a secret, and have them loaded to the config automatically:

apiVersion: v1
kind: Secret
metadata:
  name: mirrord-kafka-ssl
  namespace: mirrord
type: Opaque
data:
  ssl.certificate.pem: "..."
  ssl.key.pem: "..."
  ssl.ca.pem: "..."
  ssl.key.password: "..."

---
apiVersion: queues.mirrord.metalbear.co/v1alpha
kind: MirrordKafkaClientConfig
metadata:
  name: ssl-auth
  namespace: mirrord
spec:
  loadFromSecret: mirrord/mirrord-kafka-ssl
  properties: []

How do I authenticate operator's Kafka client with a Java KeyStore?

The mirrord operator does not support direct use of JKS files. In order to use JKS files with Kafka splitting, first extract all necessary certificates and key to PEM files. You can do it like this:

# Convert keystore.jks to PKCS12 format.
keytool -importkeystore \
  -srckeystore keystore.jks \
  -srcstoretype JKS \
  -destkeystore keystore.p12 \
  -deststoretype PKCS12

# Extract client certificate PEM from the converted keystore
openssl pkcs12 -in keystore.p12 -clcerts -nokeys -out client-cert.pem

# Extract client private key PEM from the converted keystore.
openssl pkcs12 -in keystore.p12 -nocerts -nodes -out client-key.pem

# Convert truststore.jks to PKCS12 format.
keytool -importkeystore \
  -srckeystore truststore.jks \
  -srcstoretype JKS \
  -destkeystore truststore.p12 \
  -deststoretype PKCS12

# Extract CA PEM from the converted truststore.
openssl pkcs12 -in truststore.p12 -nokeys -out ca-cert.pem

Then, follow the guide for authenticating with an SSL certificate.

Troubleshooting SQS splitting

If you're trying to use SQS-splitting and are facing difficulties, here are some steps you can go through to identify and hopefully solve the problem.

First, some generally applicable steps:

  1. Make sure a MirrordWorkloadQueueRegistry exists for the workload you're targeting:

    kubectl describe mirrordworkloadqueueregistries.queues.mirrord.metalbear.co -n <target-namespace>
  2. Note the queue-ids in the mirrord configuration have to match the queue-ids in the MirrordWorkloadQueueRegistry of the used target.

  3. Get the logs from the mirrord-operator, in case it becomes necessary for the mirrord team to look into your issue, like this:

    kubectl logs -n mirrord -l app==mirrord-operator --tail -1 > /tmp/mirrord-operator-$(date +"%Y-%m-%d_%H-%M-%S").log

    If that fails, you might not have permissions to the operator's logs.

    To get especially helpful logs, you can change the log level for SQS-splitting, then reproduce the issue to get the relevant logs. This can be achieved by reinstalling the helm chart and setting the operator.logLevel helm value to mirrord=info,operator=info,operator_sqs_splitting::forwarder=trace:

    helm upgrade mirrord-operator --reuse-values --set operator.logLevel "mirrord=info,operator=info,operator_sqs_splitting::forwarder=trace" metalbear/mirrord-operator

    or by setting the RUST_LOG environment variable in the operator’s deployment to mirrord=info,operator=info,operator_sqs_splitting::forwarder=trace, e.g. by using kubectl edit deploy mirrord-operator -n mirrord.

  4. Some operations, like changing a MirrordWorkloadQueueRegistry of a workload while there are active sessions to that target, are not yet supported, and can lead to a bad state for mirrord. If you've reached such a state:

    1. Delete all the mirrord SQS session resources of the affected target. Those resources, MirrordSqsSession, are not the same as MirrordWorkloadQueueRegistry. You can delete them with:

      kubectl get mirrordsqssessions.queues.mirrord.metalbear.co -n <TARGET-NAMESPACE> -o json \
      | jq -r '.items[] | select(.spec.queueConsumer.name=="<TARGET-WORKLOAD-NAME>" and .spec.queueConsumer.workloadType=="<TARGET-WORKLOAD-TYPE>") | .metadata.name' \     
      | xargs -r -I {} kubectl delete mirrordsqssessions.queues.mirrord.metalbear.co -n <TARGET-NAMESPACE> {}
    2. Restart the mirrord operator, e.g.:

      kubectl rollout restart deployment mirrord-operator -n mirrord

If some (but not all) of the messages that should arrive at the local service arrive at the remote service

It's possible the target workload's restart is not complete yet, and there are still pods reading directly from the original queue (those will be pods that DO NOT have a operator.metalbear.co/patched label). You can wait a bit for them to be replaced with new pods, patched by mirrord, that read from a temporary queue created by mirrord, or you can delete them.

If all SQS sessions are over but the remote service still didn't change back to read from the original queue

When there are no more queue splitting sessions to a target, the target workload will not immediately be changed to read directly from the original queue. Instead, it will keep reading from the temporary queue until its empty, so that no messages intended for the remote service are lost.

If the target workload doesn't change back within the expected time, check its logs and make sure it is consuming queue messages.

If you don't want to wait for the remote service to drain the temporary queue, and you don't care about losing those messages, you can set the operator.sqsSplittingLingerTimeout value in the operator's helm chart, to set a timeout for the draining of the temporary queue.

If that service is trying to consume messages correctly, and the temporary queue is already empty, but the target application still doesn't get restored to its original state, please try restarting the application, deleting any lingering MirrordSqsSession objects, and if possible, restart the mirrord operator.

Last updated

Was this helpful?