Queue Splitting
Sharing queues by splitting messages between multiple clients and the cluster
If your application consumes messages from a queue service, you should choose a configuration that matches your intention:
Running your application with mirrord without any special configuration will result in your local application competing with the deployed application (and potentially other mirrord runs by teammates) for queue messages.
Running your application with
copy_target+scale_downwill result in the deployed application not consuming any messages, and your local application being the exclusive consumer of queue messages.If you want to control which messages will be consumed by the deployed application, and which ones will reach your local application, set up queue splitting for the relevant target, and define a messages filter in the mirrord configuration. Messages that match the filter will reach your local application, and messages that do not, will reach either the deployed application, or another teammate's local application, if they match their filter.
This feature is available to users on the Team and Enterprise pricing plans.
Queue splitting is currently available for Amazon SQS, Kafka, RabbitMQ and Google Cloud Pub/Sub. The word "queue" in this doc is used to also refer to "topic" in the context of Kafka and "subscription" in the context of Google Cloud Pub/Sub.
How It Works
When a queue splitting session starts, the mirrord operator patches the target workload (e.g. deployment or rollout) to consume messages from a different, temporary queue. That temporary queue is exclusive to the target workload. Similarly, the local application is reconfigured to consume messages from its own exclusive temporary queue.
Queue splitting requires that the application read the queue name from an environment variable. This lets the operator override the environment variable to change the queue that the application reads from.
Once all temporary queues are prepared, the mirrord operator starts consuming messages from the original queue, and publishing them to one of the temporary queues, based on message filters provided by the users in their mirrord configs. This routing is based on message filters provided by the users in their mirrord configs.
First, we have a consumer app reading messages from an SQS queue:
When the first mirrord SQS splitting session starts, two temporary queues are created (one for the target deployed in the cluster, one for the user's local application), and the mirrord operator routes messages according to the user's filter:
If a second user then starts a mirrord SQS splitting session on the same queue, a the third temporary queue is created (for the second user's local application). The mirrord operator includes the new queue and the second user's filter in the routing logic.
If the filters defined by the two users both match some message, one of the users will receive the message at random.
First, we have a consumer app reading messages from a Kafka queue:
When the first mirrord Kafka splitting session starts, two temporary queues are created (one for the target deployed in the cluster, one for the user's local application), and the mirrord operator routes messages according to the user's filter:
If a second user then starts a mirrord Kafka splitting session on the same queue, a third temporary queue is created (for the second user's local application). The mirrord operator includes the new queue and the second user's filter in the routing logic.
If the filters defined by the two users both match some message, one of the users will receive the message at random.
First, we have a consumer app reading messages from a RabbitMQ queue:
When the first mirrord RabbitMQ splitting session starts, two temporary queues are created (one for the target deployed in the cluster, one for the user's local application), and the mirrord operator routes messages according to the user's filter:
If a second user then starts a mirrord RabbitMQ splitting session on the same queue, a third temporary queue is created (for the second user's local application). The mirrord operator includes the new queue and the second user's filter in the routing logic.
If the filters defined by the two users both match some message, one of the users will receive the messages at random.
First, we have a consumer app reading messages from a Google Cloud Pub/Sub subscription.
When the first mirrord Pub/Sub splitting session starts, the operator creates temporary topics and subscriptions (one set for the target deployed in the cluster, one for the user's local application), and routes messages according to the user's filter.
If a second user then starts a mirrord Pub/Sub splitting session on the same subscription, an additional temporary topic and subscription are created for the second user's local application. The operator includes the new subscription and the second user's filter in the routing logic.
If the filters defined by the two users both match some message, one of the users will receive the message at random.
Temporary queues are managed by the mirrord operator and garbage collected in the background. After all queue splitting sessions end, the operator promptly deletes the allocated resources.
Please note that:
Temporary queues created for the deployed targets will not be deleted as long as there are any targets' pods that use them.
In case of SQS splitting, deployed targets will keep reading from the temporary queues as long as their temporary queues have unconsumed messages.
For Google Cloud Pub/Sub, the operator creates temporary topics and subscriptions. The target workload's subscription environment variable is patched to read from a temporary subscription, while the operator drains the original subscription and forwards messages through temporary topics.
Enabling Queue Splitting in Your Cluster
Enable SQS splitting in the Helm chart
Enable the operator.sqsSplitting setting in the mirrord-operator Helm chart.
Authenticate and authorize the mirrord operator
The mirrord operator will need to be able to perform operations on the SQS queues. To do this, it will build an SQS client, using the default credentials provider chain.
The easiest way to provide the credentials for the operator is with IAM role assumption. For that, an IAM role with an appropriate policy has to be assigned to the operator's service account. Please follow AWS's documentation on how to do that. Note that operator's service account can be annotated with the IAM role's ARN with the sa.roleArn setting in the mirrord-operator Helm chart.
Some of the SQS permissions are needed for your actual queues that you would like to split, and some permissions are only needed for the temporary queues, managed by the operator. Here is an overview:
GetQueueUrl
✓
ListQueueTags
✓
ReceiveMessage
✓
DeleteMessage
✓
GetQueueAttributes
✓
✓ (both!)
CreateQueue
✓
TagQueue
✓
SendMessage
✓
DeleteQueue
✓
And here we provide a short explanation for each required permission:
sqs:GetQueueUrl: the operator finds queue names to split in the provided source, and then it fetches the URL from SQS in order to make other API calls.sqs:GetQueueAttributes: the operator queries your queue's attributes, in order to clone these attributes to all derived temporary queues. It also reads the attributes of the temporary queues, in order to check the number of remaining messages.sqs:ListQueueTags: the operator queries your queue's tags, in order to clone these tags to all derived temporary queues.sqs:ReceiveMessage: the operator reads messages from the queues you split.sqs:DeleteMessage: after reading a message and forwarding it to a temporary queue, the operator deletes the message from the split queue.sqs:CreateQueue: the operator creates temporary queues in your SQS account.sqs:TagQueue: the operator sets tags on the temporary queues.sqs:SendMessage: the operator sends messages to the temporary queues.sqs:DeleteQueue: the operator deletes stale temporary queues in the background.
This is an example for a policy that gives the operator's roles the minimal permissions it needs to split a queue called ClientUploads:
The first statement gives the role the permissions it needs for your original queues.
Instead of specifying the queues you would like to be able to split in the first statement, you could alternatively make that statement apply for all resources in the account, and limit the queues it applies to using conditions instead of resource names. For example, you could add a condition that makes the statement only apply to queues with the tag
splittable=trueorenv=devetc. and set those tags for all queues you would like to allow the operator to split.The second statement in the example gives the role the permissions it needs for the temporary queues. Since all the temporary queues created by mirrord are created with the name prefix
mirrord-, that statement in the example is limited to resources with that prefix in their name.If you would like to limit the second statement with conditions instead of (only) with the resource name, you can set a condition that requires a tag, and in the
MirrordWorkloadQueueRegistryresource you can specify for each queue tags that mirrord will set for temporary queues that it creates for that original queue (see relevant section).
If the queue messages are encrypted, the operator's IAM role should also have the following permissions:
kms:Encryptkms:Decryptkms:GenerateDataKey
Authorize deployed consumers
In order to be targeted with SQS splitting, a deployed consumer must be able to use the temporary queues created by mirrord. E.g. if the consumer application retrieves the queue's URL based on its name, lists queue's tags, consumes and deletes messages from the queue — it must be able to do the same on a temporary queue.
Any temporary queues managed by mirrord are created with the same policy as the original queues they are splitting (with the single change of updating the queue name in the policy). Therefore, access control based on SQS policies should automatically be taken care of.
However, if the consumer's access to the queue is controlled by an IAM policy (and not an SQS policy, see SQS docs), you will need to adjust it.
Provide application context
On operator installation with operator.sqsSplitting enabled, a new CustomResource type is defined in your cluster — MirrordWorkloadQueueRegistry. Users with permissions to get CRDs can verify its existence with kubectl get crd mirrordworkloadqueueregistries.queues.mirrord.metalbear.co. Before you can run sessions with SQS splitting, you must create a queue registry for the desired target. This is because the queue registry contains additional application context required by the mirrord operator. For example, the operator needs to know which environment variables contain the names of the SQS queues to split.
See an example queue registry defined for a deployment meme-app living in namespace meme:
The registry above says that:
It provides context for container
mainrunning in deploymentmeme-appin namespacememe.The container consumes two SQS queues. Their names are read from environment variables
INCOMING_MEME_QUEUE_NAMEandAD_QUEUE_NAME.The SQS queues can be referenced in a mirrord config under IDs
meme-queueandad-queue, respectively.When creating a temporary queue derived from either of the two queues, mirrord operator should add the tag
tool=mirrord.
Link the registry to the deployed consumer
The queue registry is a namespaced resource, so it can only reference a consumer deployed in the same namespace. The reference is specified with spec.consumer:
name— name of the Kubernetes workload of the deployed consumer.workloadType— type of the Kubernetes workload of the deployed consumer. Right now only consumers deployed in deployments and rollouts are supported.container— name of the exact container running in the workload. This field is optional. If you omit it, the registry will reference all of the workload's containers.
Desribe consumed queues in the registry
The queue registry describes SQS queues consumed by the referenced consumer. The queues are described in entries of the spec.queues object.
The entry's key can be arbitrary, as it will only be referenced from the user's mirrord config.
The entry's value is an object describing single or multiple SQS queues consumed by the workload:
nameSourcedescribes which environment variables contain names/URLs of the consumed queues. EitherenvVarorregexPatternfield is required.envVarstores a name of a single environment variables.regexPatternselects multiple environment variables based on a regular expression.
fallbackNamestores an optional fallback name/URL, in casenameSourceis not found in the workload spec.nameSourcewill still be used to inject the name/URL of the temporary queue.namesFromJsonMapspecifies how to process the values of environment variables that contain queue names/URLs. If set totrue, values of all variables of will be parsed as JSON objects with string values. All values in these objects will be treated as queue names/URLs. If set tofalse, values of all variables will be treated directly as queue names/URLs. Defaults tofalse.tagsspecifies additional tags to be set on all created temporary queues.snsspecifies whether the queues contains SQS messages created from SNS notifications. If set totrue, message bodies will be parsed and matched against users' filters, as SNS notification attributes are found in the SQS message body. If set tofalse, message attributes will be used matched against users' filters. Defaults tofalse.
The mirrord operator can only read consumer's environment variables if they are either:
defined directly in the workload's pod template, with the value defined in
valueor invalueFromvia config map reference; orloaded from config maps using
envFrom.
Enable Kafka splitting in the Helm chart
Enable the operator.kafkaSplitting setting in the mirrord-operator Helm chart.
Configure the operator's Kafka client
The mirrord operator will need to be able to perform some operations on the Kafka cluster. To allow for properly configuring the operator's Kafka client, on operator installation with operator.kafkaSplitting enabled, a new CustomResource type is defined in your cluster — MirrordKafkaClientConfig. Users with permissions to get CRDs can verify its existence with kubectl get crd mirrordkafkaclientconfigs.queues.mirrord.metalbear.co.
The resource allows for specifying a list of properties for the Kafka client, like this:
When used by the mirrord Operator for Kafka splitting, the example below will be resolved to following .properties file:
This file will be used when creating a Kafka client for managing temporary queues, consuming messages from the original queue and producing messages to the temporary queues. Full list of available properties can be found here.
group.id property will always be overwritten by mirrord Operator when resolving the .properties file.
MirrordKafkaClientConfig resources must always be created in the operator's namespace.
See additional options section for more Kafka configuration info.
Authorize deployed consumers
In order to be targeted with Kafka splitting, a deployed consumer must be able to use the temporary queues created by mirrord. E.g. if the consumer application describes the queue or reads messages from it — it must be able to do the same on a temporary queue. This might require extra actions on your side to adjust the authorization, for example based on queue name prefix. See queue names section for more info.
Provide application context
On operator installation with operator.kafkaSplitting enabled, a new CustomResource type is defined in your cluster — MirrordKafkaTopicsConsumer. Users with permissions to get CRDs can verify its existence with kubectl get crd mirrordkafkatopicsconsumers.queues.mirrord.metalbear.co. Before you can run sessions with Kafka splitting, you must create a topics consumer resource for the desired target. This is because the topics consumer resource contains additional application context required by the mirrord operator. For example, the operator needs to know which environment variables contain the names of the Kafka queues to split.
See an example topics consumer resource, for a meme app that consumes messages from a Kafka queue:
The topics consumer resource above says that:
It provides context for deployment
meme-appin namespacememe.The deployment consumes one queue. Its name is read from environment variable
KAFKA_TOPIC_NAMEin containerconsumer. The Kafka consumer group id is read from environment variableKAFKA_GROUP_IDin containerconsumer.The Kafka queue can be referenced in a mirrord config under ID
views-topic.
Link the topics consumer resource to the deployed consumer
The topics consumer resource is namespaced, so it can only reference a Kafka consumer deployed in the same namespace. The reference is specified with spec.consumer* fields, which cover api version, kind, and name of the Kubernetes workload. For instance to configure Kafka splitting of a consumer deployed in a stateful set kafka-notifications-worker, you would set:
The operator supports Kafka splitting on deployments, stateful sets, and Argo rollouts.
Desribe consumed queues in the topics consumer resource
The topics consumer resource describes Kafka queues consumed by the referenced consumer. The queues are described in entries of the spec.topics list:
idcan be arbitrary, as it will only be referenced from the user's mirrord config.clientConfigstores the name of theMirrordKafkaClientConfigto use when making connections to the Kafka cluster.nameSourcesstores a list of all occurrences of the queue name in the consumer workload's pod template.groupIdSourcesstores a list of all occurrences of the consumer Kafka group ID in the consumer workload's pod template. The operator will use the same group ID when consuming messages from the queue.
The mirrord operator can only read consumer's environment variables if they are either:
defined directly in the workload's pod template, with the value defined in
valueor invalueFromvia config map reference; orloaded from config maps using
envFrom.
Additional Options
Customizing Temporary Kafka Queue Names
Available since chart version 1.27 and operator version 3.114.0.
To serve Kafka splitting sessions, mirrord operator creates temporary queues in the Kafka cluster. The default format for their names is as follows:
mirrord-tmp-1234567890-fallback-topic-original-topic- for the fallback queue (unfiltered messages, consumed by the deployed workload).mirrord-tmp-0987654321-original-topic- for the user queues (filtered messages, consumed by local applications running with mirrord).
Note that the random digits will be unique for each temporary queue created by the operator.
You can adjust the format of the created queues names to suit your needs (RBAC, Security, Policies, etc.), using the OPERATOR_KAFKA_SPLITTING_TOPIC_FORMAT environment variable of the mirrord operator, or operator.kafkaSplittingTopicFormat helm chart value. The default value is:
mirrord-tmp-{{RANDOM}}{{FALLBACK}}{{ORIGINAL_TOPIC}}
The provided format must contain the three variables: {{RANDOM}}, {{FALLBACK}} and {{ORIGINAL_TOPIC}}.
{{RANDOM}}will resolve to random digits.{{FALLBACK}}will resolve either to-fallback-or-literal.{{ORIGINAL_TOPIC}}will resolve to the name of the original topic that is being split.
Reusing Kafka Client Configs
MirrordKafkaClientConfig resource supports property inheritance via spec.parent field. When resolving a resource config-A that has a parent config-B:
config-Bis resolved into a.propertiesfile.For each property defined in
config-A:If
valueis provided, it overrides any previous value of that propertyIf
valueis not provided (null), that property is removed
Below we have an example of two MirrordKafkaClientConfigs with an inheritance relation:
When used by the mirrord operator for Kafka splitting, the with-client-id below will be resolved to following .properties file:
Configuring Kafka Clients with Secrets
MirrordKafkaClientConfig also supports loading properties from a Kubernetes Secret, with the spec.loadFromSecret field. The value for spec.loadFromSecret is given in the form: <secret-namespace>/<secret-name>.
Each key-value entry defined in the secret's data will be included in the resulting .properties file. Property inheritance from the parent still occurs, and within each MirrordKafkaClientConfig properties loaded from the secret are overwritten by those in properties.
This means the priority of setting properties (from highest to lowest) is like so:
child
spec.propertieschild
spec.loadFromSecretparent
spec.propertiesparent
spec.loadFromSecret
Below is an example for a MirrordKafkaClientConfig resource that references a secret:
Note that by default, mirrord operator has read access only to the secrets in the operator's namespace.
Configuring Custom Kafka Authentication
For authentication methods that cannot be handled just by setting client properties, we provide a separate field spec.authenticationExtra. The field allows for specifying custom authentication methods:
The example above configures IAM/OAUTHBEARER authentication with Amazon Managed Streaming for Apache Kafka. When the MSK_IAM kind is used, two additional properties are automatically merged into the configuration:
sasl.mechanism=OAUTHBEARERsecurity.protocol=SASL_SSL
To produce the authentication tokens, the operator will use the default credentials provider chain. The easiest way to provide the credentials for the operator is with IAM role assumption. For that, an IAM role with an appropriate policy has to be assigned to the operator's service account. Please follow AWS's documentation on how to do that. Note that operator's service account can be annotated with the IAM role's ARN with the sa.roleArn setting in the mirrord-operator Helm chart.
Configuring Workload Restart
To inject the names of the temporary queues into the consumer workload, the operator always requires the workload to be restarted. Depending on cluster conditions, and the workload itself, this might take some time.
MirrordKafkaTopicsConsumer allows for specifying two more options for this:
spec.consumerRestartTimeout— specifies how long the operator should wait, before a new pod becomes ready, and after the workload restart is triggered. This allows for silencing timeout errors when the workload pods take a long time to start. Specified in seconds, defaults to 60s.spec.splitTtl— specifies how long the consumer workload should remain patched, after the last Kafka splitting session against it have finished. This allows for skipping the subsequent restart in case the next Kafka splitting session is started before the TTL elapses. Specified in seconds.
Enable RabbitMQ splitting in the Helm chart
Enable the operator.rmqSplitting setting in the mirrord-operator Helm chart.
Cluster Declaration
The mirrord operator needs a way to connect to your RabbitMQ cluster to consume and re-route messages according to filters. As part of operator installation with operator.rmqSplitting enabled, a new CustomResource type is defined in your cluster — MirrordPropertyList. Use this resource to define the cluster and queue connection parameters for splitting. MirrordPropertyList is modeled after the env and envFrom fields in a pod's container spec. You can:
Set values directly in the
propertiesfield usingvalue.Reference a single key from a ConfigMap or Secret using
valueFrom.configMapKeyReforvalueFrom.secretKeyRef.Include all keys from a ConfigMap or Secret using
configMapReforsecretRefunderpropertiesFrom. An optionalprefixis prepended to each key.
If you set properties field using value then it must always be string value: '1' instead of value: 1.
You must create at least one MirrordPropertyList with your cluster properties inside of it.
If your application expects specific queue attributes (e.g. durable, or arguments like x-queue-type), create a MirrordPropertyList with those queue declaration properties.
Cluster Properties
scheme
Protocol used for the connection
amqp or amqps
amqp
host
Hostname or IP address of the message broker
✓
string
port
Network port the broker is listening on
integer
5671 or 5672 according to scheme
username
Credential used to authenticate the connection
string
password
Secret key or password for the specified user
string
vhost
A logical isolation unit (virtual host) within the broker
string
'/'
sasl.mechanism
Authentication strategy used during the handshake
amqplain anonymous external plain or rabbit-cr-demo
tls.crt
public certificate (PEM format) used for client authentication
string (PEM)
tls.key
private key (PEM format) matching the client certificate
string (PEM)
ca-certificates.crt
CA certificate(s) (PEM format) used to verify the broker's identity
string (PEM)
client.*
Custom metadata or properties sent to the broker
object / key-value pairs
Queue Declare Properties
durable
If true, the queue survives a broker restart
boolean
false
exclusive
If true, the queue can only be accessed by the current connection and will be deleted when that connection closes
boolean
false
auto_delete
If true, the queue is deleted automatically when the last consumer unsubscribes
boolean
false
arguments.*
Custom properties sent in queue declaration
object / key-value pairs
Provide application context
On operator installation with operator.rmqSplitting enabled, a new CustomResource type is defined in your cluster — MirrordWorkloadQueueRegistry. Users with permissions to get CRDs can verify its existence with kubectl get crd mirrordworkloadqueueregistries.queues.mirrord.metalbear.co. Before you can run sessions with RabbitMQ splitting, you must create a queue registry for the desired target. This is because the queue registry contains additional application context required by the mirrord operator. For example, the operator needs to know which environment variables contain the names of the RabbitMQ queues to split.
See an example queue registry defined for a deployment meme-app living in namespace meme:
The registry above says that:
It provides context for container
mainrunning in deploymentmeme-appin namespacememe.The cluster where the queue is in has its properties defined in
meme-rmq-clusterMirrordPropertyList (in thememenamespace).The container consumes two RabbitMQ queues. Their names are read from environment variables
INCOMING_MEME_QUEUE_NAMEandAD_QUEUE_NAME.The queues can be referenced in a mirrord config under IDs
meme-queueandad-queue, respectively.
Link the registry to the deployed consumer
The queue registry is a namespaced resource, so it can only reference a consumer deployed in the same namespace. The reference is specified with spec.consumer:
name— name of the Kubernetes workload of the deployed consumer.workloadType— type of the Kubernetes workload of the deployed consumer. Right now only consumers deployed in deployments and rollouts are supported.container— name of the exact container running in the workload. This field is optional. If you omit it, the registry will reference all of the workload's containers.
Describe consumed queues in the registry
The queue registry describes RabbitMQ queues consumed by the referenced consumer. The queues are described in entries of the spec.queues object.
The entry's key can be arbitrary, as it will only be referenced from the user's mirrord config.
The entry's value is an object describing one or more RabbitMQ queues consumed by the workload:
clusterProperties(required) is the name ofMirrordPropertyListcontaining connection properties for the RabbitMQ cluster.nameSourcedescribes which environment variables contain names/URLs of the consumed queues. EitherenvVarorregexPatternfield is required.envVarstores a name of a single environment variable.regexPatternselects multiple environment variables based on a regular expression.
fallbackNamestores an optional fallback name/URL, in casenameSourceis not found in the workload spec.nameSourcewill still be used to inject the name/URL of the temporary queue.namesFromJsonMapspecifies how to process the values of environment variables that contain queue names/URLs. If set totrue, values of all variables will be parsed as JSON objects with string values. All values in these objects will be treated as queue names/URLs. If set tofalse, values of all variables will be treated directly as queue names/URLs. Defaults tofalse.queuePropertiesthe name ofMirrordPropertyListthat contains parameters for the queue definition (durable, queue type or any other attribute)
The mirrord operator can only read consumer's environment variables if they are either:
defined directly in the workload's pod template, with the value defined in
valueor invalueFromvia config map reference; orloaded from config maps using
envFrom.
Enable GCP Pub/Sub splitting in the Helm chart
Enable the operator.gcpPubsubSplitting setting in the mirrord-operator Helm chart.
Authenticate the mirrord operator
The mirrord operator needs access to the Google Cloud Pub/Sub API to create and manage temporary topics and subscriptions.
There are two ways to provide credentials:
Option A: Workload Identity (recommended)
Workload Identity binds a Kubernetes service account to a Google Cloud IAM service account. You can annotate the operator's service account with the GCP service account email using the sa.annotations setting in the Helm chart:
Option B: Service account JSON key
You can provide a service account JSON key through a MirrordPropertyList. Store the key in a Kubernetes Secret, then reference it from the property list:
Then reference this property list from the MirrordSplitConfig either per-queue (spec.queues[].clientConfig) or as a default for all Pub/Sub queues (spec.clientConfigs.googlePubSub).
Whichever method you choose, the IAM service account needs the following Pub/Sub permissions:
pubsub.subscriptions.consume
✓
pubsub.subscriptions.get
✓
pubsub.topics.attachSubscription
✓
pubsub.topics.create
✓
pubsub.topics.delete
✓
pubsub.topics.publish
✓
pubsub.subscriptions.create
✓
pubsub.subscriptions.delete
✓
A good starting point is to assign the roles/pubsub.editor role to the operator's service account, scoped to the relevant project.
Authorize deployed consumers
In order to be targeted with Pub/Sub splitting, a deployed consumer must be able to read from the temporary subscriptions created by mirrord. If the consumer's IAM permissions are scoped to specific subscription names, you will need to extend them to cover subscriptions with the mirrord-tmp- prefix. This prefix is customizable via the spec.tmpNameTemplate field in your MirrordSplitConfig resource.
Provide application context
On operator installation with operator.gcpPubsubSplitting enabled, a new CustomResource type is defined in your cluster - MirrordSplitConfig. Users with permissions to get CRDs can verify its existence with kubectl get crd mirrordsplitconfigs.queues.mirrord.metalbear.co.
Before you can run sessions with Pub/Sub splitting, you must create a MirrordSplitConfig for the desired target. This tells the operator which subscriptions to split and how the application discovers their names.
See an example MirrordSplitConfig defined for a deployment event-processor living in namespace events:
The MirrordSplitConfig above says that:
It targets the deployment
event-processorin namespaceevents.Temporary resources will be named with the
mirrord-tmp-prefix (this is the default, shown here for clarity). You can change this prefix to scope IAM permissions.The deployment consumes one Pub/Sub subscription, whose name is in environment variable
PUBSUB_SUBSCRIPTIONin containerconsumer.The GCP project ID is in environment variable
GCP_PROJECT_IDin containerconsumer.The subscription can be referenced in a mirrord config under ID
user-events.
Link the config to the deployed consumer
The MirrordSplitConfig is a namespaced resource. The target workload reference is specified with spec.targetRef:
apiVersion- API version of the Kubernetes workload (e.g.apps/v1).kind- type of the workload. Supported:Deployment,StatefulSet,Rollout.name- name of the workload.
Describe consumed subscriptions
Each entry in the spec.queues list describes one or more Pub/Sub subscriptions consumed by the workload:
id- arbitrary queue ID that developers reference from their mirrord config.kind- must beGooglePubSub.appConfig.subscription- how the application discovers the subscription name. Each entry can use:env- exact environment variable name containing the subscription ID.envLike- regex matching environment variable names.fallback- fallback subscription name if the variable is not found.valueSelector- a jq expression to extract the subscription name from the variable's value. Useful when the env var contains JSON or a compound string rather than a plain name.containers- limit to specific containers (optional, defaults to all).
appConfig.projectId- how the application discovers the GCP project ID. Uses the same structure assubscription.clientConfig(optional) - name of aMirrordPropertyListcontaining GCP-specific connection properties. Can also be set at the top level inspec.clientConfigs.googlePubSub.queueConfig(optional) - name of aMirrordPropertyListwith additional configuration for temporary resources.
The mirrord operator can only read consumer's environment variables if they are either:
defined directly in the workload's pod template, with the value defined in
valueor invalueFromvia config map reference; orloaded from config maps using
envFrom.
Setting a Filter for a mirrord Run
Once cluster setup is done, mirrord users can start running sessions with queue message filters in their mirrord configuration files. feature.split_queues is the configuration field they need to specify in order to filter queue messages. Directly under it, mirrord expects a mapping from a queue or queue ID to a queue filter definition.
Filter definition contains the following fields:
queue_type-SQS,Kafka,RMQorGCPPubSubmessage_filter- mapping from message attribute (SQS, GCP Pub/Sub) or header (Kafka, RabbitMQ) name to a regex for its value. The local application will only see queue messages that have all of the specified message attributes/headers.jq_filter- supported forSQSandGCPPubSubqueue types.For SQS, it runs a jq program on the JSON representation of the SQS
Messageobject.For GCP Pub/Sub, it runs a jq program on the JSON representation of the
PubsubMessageobject.A message matches if the jq program outputs
true.
If both message_filter and jq_filter are specified for the same queue, both must match for a message to be matched.
When choosing which SQS attributes, Kafka headers or Pub/Sub attributes to filter on, first check whether your framework, messaging client, or observability library already propagates message metadata for you. Many modern stacks can forward tracing-related context out of the box, especially for Kafka headers. Prefer enabling that before adding manual propagation code.
An empty message_filter without a jq_filter is treated as a match-none directive.
See example configurations below:
In the example above, the local application:
Will receive a subset of messages from SQS queues desribed in the registry under ID
meme-queue. All received messages will have an SQS attributebaggagecontainingmirrord-session=alice.Will receive a subset of messages from SQS queues described in the registry under ID
orders-queue. All received messages will have a JSON body with"important": true.Will receive no messages from SQS queues described in the registry under ID
ad-queue.Will receive a subset of messages from Kafka queue with ID
views-topic. All received messages will have a Kafka headerbaggagecontainingmirrord-session=alice.
In the example above, the local application:
Will receive messages from SQS queue
orders-queueonly when the message body is valid JSON and contains"client": "a".Will receive messages from SQS queue
fifo-orders-queueonly when the SQS message attributeclienthas the valuea.
In the example above, the local application will receive messages from SQS queue orders-queue only when both of these conditions hold:
The SQS message attribute
clienthas the valuea.The JSON message body contains
"important": true.
In the example above, the local application will receive a subset of message from all SQS queues described in the registry. All received messages will have an SQS attribute baggage containing mirrord-session=pr-123. * is a special queue ID for SQS queues, and resolves to all queues described in the registry.
In the example above, the local application will receive a subset of messages from the Pub/Sub subscription described in the MirrordSplitConfig under ID user-events. All received messages will have a Pub/Sub attribute env with the value dev.
In the example above, the local application will receive messages from the Pub/Sub subscription user-events only when the message body (base64-decoded) is valid JSON and contains "user_id": "test-user".
In the example above, the local application will receive a subset of messages from all Pub/Sub subscriptions described in the target's MirrordSplitConfig. All received messages will have a Pub/Sub attribute env with the value dev. * resolves to all queues defined in the MirrordSplitConfig for the target workload. If no MirrordSplitConfig exists, the wildcard is silently ignored. Check the operator logs if messages are not being filtered as expected.
FAQ
How do I authenticate operator's Kafka client with an SSL certificate?
An example MirrordKafkaClientConfig would look as follows:
Alternatively, you can store the credentials in a secret, and have them loaded to the config automatically:
How do I authenticate operator's Kafka client with a Java KeyStore?
The mirrord operator does not support direct use of JKS files. In order to use JKS files with Kafka splitting, first extract all necessary certificates and key to PEM files. You can do it like this:
Then, follow the guide for authenticating with an SSL certificate.
Troubleshooting SQS splitting
If you're trying to use SQS-splitting and are facing difficulties, here are some steps you can go through to identify and hopefully solve the problem.
First, some generally applicable steps:
Make sure a
MirrordWorkloadQueueRegistryexists for the workload you're targeting:Note the queue-ids in the mirrord configuration have to match the queue-ids in the
MirrordWorkloadQueueRegistryof the used target.Get the logs from the mirrord-operator, in case it becomes necessary for the mirrord team to look into your issue, like this:
If that fails, you might not have permissions to the operator's logs.
To get especially helpful logs, you can change the log level for SQS-splitting, then reproduce the issue to get the relevant logs. This can be achieved by reinstalling the helm chart and setting the
operator.logLevelhelm value tomirrord=info,operator=info,operator_sqs_splitting::forwarder=trace:or by setting the
RUST_LOGenvironment variable in the operator’s deployment tomirrord=info,operator=info,operator_sqs_splitting::forwarder=trace, e.g. by usingkubectl edit deploy mirrord-operator -n mirrord.Some operations, like changing a
MirrordWorkloadQueueRegistryof a workload while there are active sessions to that target, are not yet supported, and can lead to a bad state for mirrord. If you've reached such a state:Delete all the mirrord SQS session resources of the affected target. Those resources,
MirrordSqsSession, are not the same asMirrordWorkloadQueueRegistry. You can delete them with:Restart the mirrord operator, e.g.:
If some (but not all) of the messages that should arrive at the local service arrive at the remote service
It's possible the target workload's restart is not complete yet, and there are still pods reading directly from the original queue (those will be pods that DO NOT have a operator.metalbear.co/patched label). You can wait a bit for them to be replaced with new pods, patched by mirrord, that read from a temporary queue created by mirrord, or you can delete them.
If all SQS sessions are over but the remote service still didn't change back to read from the original queue
When there are no more queue splitting sessions to a target, the target workload will not immediately be changed to read directly from the original queue. Instead, it will keep reading from the temporary queue until its empty, so that no messages intended for the remote service are lost.
If the target workload doesn't change back within the expected time, check its logs and make sure it is consuming queue messages.
If you don't want to wait for the remote service to drain the temporary queue, and you don't care about losing those messages, you can set the operator.sqsSplittingLingerTimeout value in the operator's helm chart, to set a timeout for the draining of the temporary queue.
If that service is trying to consume messages correctly, and the temporary queue is already empty, but the target application still doesn't get restored to its original state, please try restarting the application, deleting any lingering MirrordSqsSession objects, and if possible, restart the mirrord operator.
Last updated
Was this helpful?

