Chapter 12: Advanced Integration Patterns

Overview

Once you've mastered the basics of WSO2 mediation, APIs, and security, you need patterns that handle real-world complexity: parallel fan-out, intelligent routing, reliable delivery, long-running transactions, and event-driven communication. This chapter covers advanced enterprise integration patterns as implemented in WSO2 Micro Integrator and API Manager.

Scatter-Gather Pattern

Scatter-Gather sends a request to multiple backends in parallel and aggregates the results into a single response. This is essential when a client needs data from several services combined into one payload.

Architecture

              ┌──────────────┐
              │   Service A   │
         ┌───▶  (inventory)  ├───┐
         │    └──────────────┘   │
         │                       │
Request  │    ┌──────────────┐   │   Aggregated
───▶ Clone ──▶│   Service B   │──┼──▶ Response
         │    │   (pricing)  │   │
         │    └──────────────┘   │
         │                       │
         │    ┌──────────────┐   │
         └───▶│   Service C   │──┘
              │  (shipping)   │
              └──────────────┘

Full Implementation

<api name="ProductDetailsAPI" context="/product" xmlns="http://ws.apache.org/ns/synapse">
    <resource methods="GET" uri-template="/{productId}">
        <inSequence>
            <property name="productId" expression="get-property('uri.var.productId')"/>

            <!-- Scatter: Clone message to three backends -->
            <clone>
                <target>
                    <sequence>
                        <call>
                            <endpoint>
                                <http method="get"
                                      uri-template="http://inventory:8080/stock/{uri.var.productId}"/>
                            </endpoint>
                        </call>
                        <enrich>
                            <source type="body" clone="true"/>
                            <target type="property" property="inventoryResponse"/>
                        </enrich>
                    </sequence>
                </target>
                <target>
                    <sequence>
                        <call>
                            <endpoint>
                                <http method="get"
                                      uri-template="http://pricing:8080/price/{uri.var.productId}"/>
                            </endpoint>
                        </call>
                        <enrich>
                            <source type="body" clone="true"/>
                            <target type="property" property="pricingResponse"/>
                        </enrich>
                    </sequence>
                </target>
                <target>
                    <sequence>
                        <call>
                            <endpoint>
                                <http method="get"
                                      uri-template="http://shipping:8080/estimate/{uri.var.productId}"/>
                            </endpoint>
                        </call>
                        <enrich>
                            <source type="body" clone="true"/>
                            <target type="property" property="shippingResponse"/>
                        </enrich>
                    </sequence>
                </target>
            </clone>

            <!-- Gather: Aggregate results into single response -->
            <aggregate>
                <completeCondition timeout="10">
                    <messageCount min="3" max="3"/>
                </completeCondition>
                <onComplete expression="json-eval($)">
                    <payloadFactory media-type="json">
                        <format>
                            {
                                "product": "$1",
                                "inventory": $2,
                                "pricing": $3,
                                "shipping": $4
                            }
                        </format>
                        <args>
                            <arg expression="$ctx:productId"/>
                            <arg expression="$ctx:inventoryResponse"/>
                            <arg expression="$ctx:pricingResponse"/>
                            <arg expression="$ctx:shippingResponse"/>
                        </args>
                    </payloadFactory>
                    <respond/>
                </onComplete>
            </aggregate>
        </inSequence>
        <faultSequence>
            <payloadFactory media-type="json">
                <format>{"error": "Failed to gather product details", "code": "SCATTER_FAIL"}</format>
                <args/>
            </payloadFactory>
            <property name="HTTP_SC" value="500" scope="axis2"/>
            <respond/>
        </faultSequence>
    </resource>
</api>

Scatter-Gather with Partial Results

When some backends may fail but you still want to return whatever succeeded:

<aggregate>
    <completeCondition timeout="5">
        <!-- Accept 1 to 3 results; don't fail if one backend is down -->
        <messageCount min="1" max="3"/>
    </completeCondition>
    <onComplete expression="json-eval($)">
        <script language="js">
        <![CDATA[
            var payload = mc.getPayloadJSON();
            var result = { available: [], failed: [] };

            // Each clone target's response is an element in the aggregated array
            for (var i = 0; i < payload.length; i++) {
                if (payload[i].error) {
                    result.failed.push(payload[i].source);
                } else {
                    result.available.push(payload[i]);
                }
            }
            mc.setPayloadJSON(result);
        ]]>
        </script>
        <respond/>
    </onComplete>
</aggregate>

Content-Based Routing

Route messages to different backends based on message content, headers, or context properties.

Switch Mediator

<api name="OrderRoutingAPI" context="/orders" xmlns="http://ws.apache.org/ns/synapse">
    <resource methods="POST">
        <inSequence>
            <!-- Route based on order type -->
            <switch source="json-eval($.orderType)">
                <case regex="DIGITAL">
                    <call>
                        <endpoint>
                            <http method="post" uri-template="http://digital-fulfillment:8080/process"/>
                        </endpoint>
                    </call>
                </case>
                <case regex="PHYSICAL">
                    <call>
                        <endpoint>
                            <http method="post" uri-template="http://warehouse:8080/ship"/>
                        </endpoint>
                    </call>
                </case>
                <case regex="SUBSCRIPTION">
                    <call>
                        <endpoint>
                            <http method="post" uri-template="http://billing:8080/subscribe"/>
                        </endpoint>
                    </call>
                </case>
                <default>
                    <payloadFactory media-type="json">
                        <format>{"error": "Unknown order type: $1"}</format>
                        <args>
                            <arg expression="json-eval($.orderType)"/>
                        </args>
                    </payloadFactory>
                    <property name="HTTP_SC" value="400" scope="axis2"/>
                </default>
            </switch>
            <respond/>
        </inSequence>
    </resource>
</api>

Filter Mediator (Binary Routing)

<!-- Route based on a header value -->
<filter source="get-property('transport', 'X-Priority')" regex="HIGH">
    <then>
        <call>
            <endpoint key="PriorityQueueEndpoint"/>
        </call>
    </then>
    <else>
        <call>
            <endpoint key="StandardQueueEndpoint"/>
        </call>
    </else>
</filter>

Routing by XPath (XML Messages)

<!-- Route SOAP messages by namespace -->
<switch source="local-name(//*)">
    <case regex="PurchaseOrder">
        <call>
            <endpoint key="PurchaseOrderService"/>
        </call>
    </case>
    <case regex="Invoice">
        <call>
            <endpoint key="InvoiceService"/>
        </call>
    </case>
    <default>
        <sequence key="unknownMessageHandler"/>
    </default>
</switch>

Dynamic Routing

Route to an endpoint determined at runtime:

<!-- Lookup the target from a database or header -->
<dblookup>
    <connection>
        <pool>
            <driver>com.mysql.cj.jdbc.Driver</driver>
            <url>jdbc:mysql://localhost:3306/routing</url>
            <user>route_user</user>
            <password>route_pass</password>
        </pool>
    </connection>
    <statement>
        <sql><![CDATA[SELECT endpoint_url FROM route_table WHERE tenant = ?]]></sql>
        <parameter expression="json-eval($.tenantId)" type="VARCHAR"/>
        <result name="targetUrl" column="endpoint_url"/>
    </statement>
</dblookup>

<property name="targetUrl" expression="$ctx:targetUrl"/>
<call>
    <endpoint>
        <http method="post" uri-template="{uri.var.targetUrl}"/>
    </endpoint>
</call>

Message Routing Patterns

Recipient List

Send a message to a list of recipients determined at runtime.

<api name="NotificationAPI" context="/notify" xmlns="http://ws.apache.org/ns/synapse">
    <resource methods="POST">
        <inSequence>
            <!-- Get list of webhook URLs from payload -->
            <property name="webhooks" expression="json-eval($.webhookUrls)"/>

            <script language="js">
            <![CDATA[
                var webhooks = eval(mc.getProperty('webhooks'));
                mc.setProperty('webhookCount', String(webhooks.length));

                // Store each URL as a numbered property
                for (var i = 0; i < webhooks.length; i++) {
                    mc.setProperty('webhook_' + i, webhooks[i]);
                }
            ]]>
            </script>

            <!-- Iterate over webhook URLs -->
            <iterate expression="json-eval($.webhookUrls)" sequential="false">
                <target>
                    <sequence>
                        <property name="targetUrl" expression="json-eval($)"/>
                        <payloadFactory media-type="json">
                            <format>
                                {
                                    "event": "$1",
                                    "data": $2,
                                    "timestamp": "$3"
                                }
                            </format>
                            <args>
                                <arg expression="$ctx:eventType"/>
                                <arg expression="$ctx:eventData"/>
                                <arg expression="get-property('SYSTEM_TIME')"/>
                            </args>
                        </payloadFactory>
                        <call>
                            <endpoint>
                                <http method="post" uri-template="{uri.var.targetUrl}"/>
                            </endpoint>
                        </call>
                    </sequence>
                </target>
            </iterate>

            <payloadFactory media-type="json">
                <format>{"status": "notifications sent"}</format>
                <args/>
            </payloadFactory>
            <respond/>
        </inSequence>
    </resource>
</api>

Message Filter (Selective Consumer)

Only forward messages that meet specific criteria:

<sequence name="selectiveConsumer" xmlns="http://ws.apache.org/ns/synapse">
    <!-- Drop messages below threshold -->
    <filter source="json-eval($.amount)" regex="^[0-9]{1,2}$">
        <then>
            <log level="custom">
                <property name="action" value="DROPPED: amount below threshold"/>
            </log>
            <drop/>
        </then>
    </filter>

    <!-- Validate required fields exist -->
    <filter source="boolean(json-eval($.customerId))" regex="false">
        <then>
            <payloadFactory media-type="json">
                <format>{"error": "customerId is required"}</format>
                <args/>
            </payloadFactory>
            <property name="HTTP_SC" value="400" scope="axis2"/>
            <respond/>
        </then>
    </filter>

    <!-- Message passes all filters; continue processing -->
    <call>
        <endpoint key="OrderProcessingEndpoint"/>
    </call>
</sequence>

Guaranteed Delivery

Guaranteed delivery ensures messages are not lost even when backends are unavailable. WSO2 implements this through message stores and message processors.

Architecture

Client ──▶ API ──▶ Message Store (JMS/JDBC/In-Memory)
                            │
                    Message Processor (polling)
                            │
                            ▼
                     Backend Service

Message Store Configuration

In-Memory Store (development only):

<messageStore name="OrderStore"
              class="org.apache.synapse.message.store.impl.memory.InMemoryStore"
              xmlns="http://ws.apache.org/ns/synapse"/>

JDBC Message Store (production):

<messageStore name="OrderStore"
              class="org.apache.synapse.message.store.impl.jdbc.JDBCMessageStore"
              xmlns="http://ws.apache.org/ns/synapse">
    <parameter name="java.naming.factory.initial">org.apache.activemq.jndi.ActiveMQInitialContextFactory</parameter>
    <parameter name="store.jdbc.driver">com.mysql.cj.jdbc.Driver</parameter>
    <parameter name="store.jdbc.connection.url">jdbc:mysql://localhost:3306/messagestore</parameter>
    <parameter name="store.jdbc.username">ms_user</parameter>
    <parameter name="store.jdbc.password">ms_pass</parameter>
    <parameter name="store.jdbc.table">message_store</parameter>
</messageStore>

JMS Message Store (ActiveMQ/RabbitMQ):

<messageStore name="OrderStore"
              class="org.apache.synapse.message.store.impl.jms.JmsStore"
              xmlns="http://ws.apache.org/ns/synapse">
    <parameter name="java.naming.factory.initial">org.apache.activemq.jndi.ActiveMQInitialContextFactory</parameter>
    <parameter name="java.naming.provider.url">tcp://activemq:61616</parameter>
    <parameter name="store.jms.destination">OrderQueue</parameter>
    <parameter name="store.jms.JMSSpecVersion">1.1</parameter>
</messageStore>

Store Mediator

Place a message into a store from a mediation sequence:

<api name="OrderAPI" context="/orders" xmlns="http://ws.apache.org/ns/synapse">
    <resource methods="POST">
        <inSequence>
            <!-- Validate the order -->
            <property name="orderId" expression="json-eval($.orderId)"/>

            <!-- Store for guaranteed delivery -->
            <store messageStore="OrderStore"/>

            <!-- Return 202 Accepted immediately -->
            <payloadFactory media-type="json">
                <format>{"status": "accepted", "orderId": "$1"}</format>
                <args>
                    <arg expression="$ctx:orderId"/>
                </args>
            </payloadFactory>
            <property name="HTTP_SC" value="202" scope="axis2"/>
            <respond/>
        </inSequence>
    </resource>
</api>

Message Processor

Polls the store and forwards messages to the backend.

Scheduled Message Forwarding Processor:

<messageProcessor name="OrderProcessor"
                  class="org.apache.synapse.message.processor.impl.forwarder.ScheduledMessageForwardingProcessor"
                  targetEndpoint="OrderServiceEndpoint"
                  messageStore="OrderStore"
                  xmlns="http://ws.apache.org/ns/synapse">
    <parameter name="interval">5000</parameter>
    <parameter name="client.retry.interval">5000</parameter>
    <parameter name="max.delivery.attempts">100</parameter>
    <parameter name="max.delivery.drop">DISABLED</parameter>
    <parameter name="is.active">true</parameter>
    <parameter name="member.count">1</parameter>
</messageProcessor>

Message Processor Parameters:

ParameterDescriptionTypical Value
intervalPolling interval (ms)5000
client.retry.intervalWait between retries (ms)5000
max.delivery.attemptsMax retry count50-100
max.delivery.dropDrop after max attemptsDISABLED or ENABLED
is.activeProcessor active on startuptrue
member.countNumber of parallel consumers1-4

Dead Letter Channel

When delivery fails after all retries, route to a dead letter store:

<!-- Failover message store -->
<messageStore name="OrderDeadLetterStore"
              class="org.apache.synapse.message.store.impl.jdbc.JDBCMessageStore"
              xmlns="http://ws.apache.org/ns/synapse">
    <parameter name="store.jdbc.driver">com.mysql.cj.jdbc.Driver</parameter>
    <parameter name="store.jdbc.connection.url">jdbc:mysql://localhost:3306/messagestore</parameter>
    <parameter name="store.jdbc.username">ms_user</parameter>
    <parameter name="store.jdbc.password">ms_pass</parameter>
    <parameter name="store.jdbc.table">dead_letter_store</parameter>
</messageStore>

<!-- Processor with failover -->
<messageProcessor name="OrderProcessor"
                  class="org.apache.synapse.message.processor.impl.forwarder.ScheduledMessageForwardingProcessor"
                  targetEndpoint="OrderServiceEndpoint"
                  messageStore="OrderStore"
                  xmlns="http://ws.apache.org/ns/synapse">
    <parameter name="interval">5000</parameter>
    <parameter name="max.delivery.attempts">10</parameter>
    <parameter name="store.failover.message.store.name">OrderDeadLetterStore</parameter>
    <parameter name="is.active">true</parameter>
</messageProcessor>

Saga Pattern

The Saga pattern manages distributed transactions across multiple services without two-phase commits. Each step has a compensating action that undoes it if a later step fails.

Saga Flow

Step 1: Reserve Inventory ──▶ Step 2: Charge Payment ──▶ Step 3: Ship Order
    │ (compensate: release)       │ (compensate: refund)      │
    │                             │                            │
    ◀── If Step 3 fails ─────────◀── Undo Step 2 ◀───────────┘
    ◀── Undo Step 1 ◀────────────┘

Implementation

<api name="OrderSagaAPI" context="/saga/orders" xmlns="http://ws.apache.org/ns/synapse">
    <resource methods="POST">
        <inSequence>
            <property name="originalPayload" expression="json-eval($)" scope="operation"/>

            <!-- Step 1: Reserve Inventory -->
            <log level="custom">
                <property name="saga-step" value="1-reserve-inventory"/>
            </log>
            <call>
                <endpoint>
                    <http method="post" uri-template="http://inventory:8080/reserve"/>
                </endpoint>
            </call>
            <property name="reservationId" expression="json-eval($.reservationId)" scope="operation"/>

            <!-- Check Step 1 result -->
            <filter source="$axis2:HTTP_SC" regex="200">
                <then>
                    <!-- Step 2: Charge Payment -->
                    <log level="custom">
                        <property name="saga-step" value="2-charge-payment"/>
                    </log>
                    <payloadFactory media-type="json">
                        <format>
                            {
                                "orderId": "$1",
                                "amount": "$2",
                                "currency": "USD"
                            }
                        </format>
                        <args>
                            <arg expression="json-eval($.orderId)" scope="operation"/>
                            <arg expression="json-eval($.totalAmount)" scope="operation"/>
                        </args>
                    </payloadFactory>
                    <call>
                        <endpoint>
                            <http method="post" uri-template="http://payment:8080/charge"/>
                        </endpoint>
                    </call>
                    <property name="paymentId" expression="json-eval($.paymentId)" scope="operation"/>

                    <filter source="$axis2:HTTP_SC" regex="200">
                        <then>
                            <!-- Step 3: Create Shipment -->
                            <log level="custom">
                                <property name="saga-step" value="3-create-shipment"/>
                            </log>
                            <call>
                                <endpoint>
                                    <http method="post" uri-template="http://shipping:8080/ship"/>
                                </endpoint>
                            </call>

                            <filter source="$axis2:HTTP_SC" regex="200">
                                <then>
                                    <!-- All steps succeeded -->
                                    <payloadFactory media-type="json">
                                        <format>{"status": "completed", "orderId": "$1"}</format>
                                        <args>
                                            <arg expression="$ctx:orderId"/>
                                        </args>
                                    </payloadFactory>
                                    <respond/>
                                </then>
                                <else>
                                    <!-- Step 3 failed: compensate Step 2, then Step 1 -->
                                    <sequence key="compensatePayment"/>
                                    <sequence key="compensateInventory"/>
                                    <sequence key="sagaFailureResponse"/>
                                </else>
                            </filter>
                        </then>
                        <else>
                            <!-- Step 2 failed: compensate Step 1 -->
                            <sequence key="compensateInventory"/>
                            <sequence key="sagaFailureResponse"/>
                        </else>
                    </filter>
                </then>
                <else>
                    <!-- Step 1 failed: no compensation needed -->
                    <sequence key="sagaFailureResponse"/>
                </else>
            </filter>
        </inSequence>
    </resource>
</api>

Compensation Sequences

<sequence name="compensatePayment" xmlns="http://ws.apache.org/ns/synapse">
    <log level="custom">
        <property name="saga-compensate" value="refunding payment"/>
    </log>
    <payloadFactory media-type="json">
        <format>{"paymentId": "$1", "reason": "saga_rollback"}</format>
        <args>
            <arg expression="get-property('operation', 'paymentId')"/>
        </args>
    </payloadFactory>
    <call>
        <endpoint>
            <http method="post" uri-template="http://payment:8080/refund"/>
        </endpoint>
    </call>
</sequence>

<sequence name="compensateInventory" xmlns="http://ws.apache.org/ns/synapse">
    <log level="custom">
        <property name="saga-compensate" value="releasing inventory"/>
    </log>
    <payloadFactory media-type="json">
        <format>{"reservationId": "$1"}</format>
        <args>
            <arg expression="get-property('operation', 'reservationId')"/>
        </args>
    </payloadFactory>
    <call>
        <endpoint>
            <http method="post" uri-template="http://inventory:8080/release"/>
        </endpoint>
    </call>
</sequence>

<sequence name="sagaFailureResponse" xmlns="http://ws.apache.org/ns/synapse">
    <payloadFactory media-type="json">
        <format>{"status": "failed", "message": "Order processing failed, all changes rolled back"}</format>
        <args/>
    </payloadFactory>
    <property name="HTTP_SC" value="500" scope="axis2"/>
    <respond/>
</sequence>

Event-Driven Architecture with WSO2

WSO2 supports event-driven patterns through JMS transports, message brokers, and webhooks.

JMS Consumer (Inbound Endpoint)

Listen for messages on a JMS queue and trigger mediation:

<inboundEndpoint name="OrderEventListener"
                 protocol="jms"
                 sequence="processOrderEvent"
                 onError="eventErrorHandler"
                 suspend="false"
                 xmlns="http://ws.apache.org/ns/synapse">
    <parameters>
        <parameter name="interval">1000</parameter>
        <parameter name="java.naming.factory.initial">org.apache.activemq.jndi.ActiveMQInitialContextFactory</parameter>
        <parameter name="java.naming.provider.url">tcp://activemq:61616</parameter>
        <parameter name="transport.jms.Destination">OrderEvents</parameter>
        <parameter name="transport.jms.SessionTransacted">true</parameter>
        <parameter name="transport.jms.SessionAcknowledgement">AUTO_ACKNOWLEDGE</parameter>
        <parameter name="transport.jms.ContentType">application/json</parameter>
        <parameter name="transport.jms.ConnectionFactoryJNDIName">QueueConnectionFactory</parameter>
        <parameter name="transport.jms.ConnectionFactoryType">queue</parameter>
    </parameters>
</inboundEndpoint>

Event Processing Sequence

<sequence name="processOrderEvent" xmlns="http://ws.apache.org/ns/synapse">
    <property name="eventType" expression="json-eval($.eventType)"/>
    <log level="custom">
        <property name="event-received" expression="$ctx:eventType"/>
    </log>

    <switch source="$ctx:eventType">
        <case regex="ORDER_CREATED">
            <call>
                <endpoint>
                    <http method="post" uri-template="http://fulfillment:8080/new-order"/>
                </endpoint>
            </call>
        </case>
        <case regex="ORDER_CANCELLED">
            <call>
                <endpoint>
                    <http method="post" uri-template="http://fulfillment:8080/cancel"/>
                </endpoint>
            </call>
        </case>
        <case regex="PAYMENT_RECEIVED">
            <call>
                <endpoint>
                    <http method="post" uri-template="http://shipping:8080/prepare"/>
                </endpoint>
            </call>
        </case>
        <default>
            <log level="custom">
                <property name="warn" value="Unknown event type, skipping"/>
            </log>
        </default>
    </switch>
</sequence>

Publishing Events to a Broker

Send events to a JMS topic from a mediation flow:

<sequence name="publishEvent" xmlns="http://ws.apache.org/ns/synapse">
    <!-- Build the event payload -->
    <payloadFactory media-type="json">
        <format>
            {
                "eventType": "$1",
                "orderId": "$2",
                "timestamp": "$3",
                "source": "integration-layer"
            }
        </format>
        <args>
            <arg expression="$ctx:eventType"/>
            <arg expression="$ctx:orderId"/>
            <arg expression="get-property('SYSTEM_TIME')"/>
        </args>
    </payloadFactory>

    <!-- Send to JMS topic -->
    <property name="OUT_ONLY" value="true"/>
    <property name="FORCE_SC_ACCEPTED" value="true" scope="axis2"/>
    <call>
        <endpoint>
            <address uri="jms:/OrderEventsTopic?transport.jms.ConnectionFactoryJNDIName=TopicConnectionFactory&amp;java.naming.factory.initial=org.apache.activemq.jndi.ActiveMQInitialContextFactory&amp;java.naming.provider.url=tcp://activemq:61616&amp;transport.jms.DestinationType=topic"/>
        </endpoint>
    </call>
</sequence>

Webhook Subscription Pattern

Expose a webhook registration API and dispatch events:

<api name="WebhookAPI" context="/webhooks" xmlns="http://ws.apache.org/ns/synapse">
    <!-- Register a webhook -->
    <resource methods="POST" uri-template="/subscribe">
        <inSequence>
            <property name="callbackUrl" expression="json-eval($.callbackUrl)"/>
            <property name="eventType" expression="json-eval($.eventType)"/>

            <!-- Store subscription in database -->
            <dbreport>
                <connection>
                    <pool>
                        <driver>com.mysql.cj.jdbc.Driver</driver>
                        <url>jdbc:mysql://localhost:3306/webhooks</url>
                        <user>wh_user</user>
                        <password>wh_pass</password>
                    </pool>
                </connection>
                <statement>
                    <sql><![CDATA[INSERT INTO subscriptions (callback_url, event_type, created_at) VALUES (?, ?, NOW())]]></sql>
                    <parameter expression="$ctx:callbackUrl" type="VARCHAR"/>
                    <parameter expression="$ctx:eventType" type="VARCHAR"/>
                </statement>
            </dbreport>

            <payloadFactory media-type="json">
                <format>{"status": "subscribed", "eventType": "$1"}</format>
                <args>
                    <arg expression="$ctx:eventType"/>
                </args>
            </payloadFactory>
            <property name="HTTP_SC" value="201" scope="axis2"/>
            <respond/>
        </inSequence>
    </resource>

    <!-- Dispatch an event to all subscribers -->
    <resource methods="POST" uri-template="/dispatch">
        <inSequence>
            <property name="eventType" expression="json-eval($.eventType)"/>
            <property name="eventPayload" expression="json-eval($.data)"/>

            <!-- Look up all subscribers for this event type -->
            <dblookup>
                <connection>
                    <pool>
                        <driver>com.mysql.cj.jdbc.Driver</driver>
                        <url>jdbc:mysql://localhost:3306/webhooks</url>
                        <user>wh_user</user>
                        <password>wh_pass</password>
                    </pool>
                </connection>
                <statement>
                    <sql><![CDATA[SELECT callback_url FROM subscriptions WHERE event_type = ?]]></sql>
                    <parameter expression="$ctx:eventType" type="VARCHAR"/>
                    <result name="callbackUrls" column="callback_url"/>
                </statement>
            </dblookup>

            <!-- Fire-and-forget to each subscriber -->
            <property name="OUT_ONLY" value="true"/>
            <call>
                <endpoint>
                    <http method="post" uri-template="{uri.var.callbackUrls}"/>
                </endpoint>
            </call>

            <payloadFactory media-type="json">
                <format>{"status": "dispatched"}</format>
                <args/>
            </payloadFactory>
            <respond/>
        </inSequence>
    </resource>
</api>

Pattern Comparison

PatternUse CaseWSO2 MechanismLatency Impact
Scatter-GatherCombine data from multiple sourcesClone + AggregateMedium (parallel calls)
Content-Based RoutingRoute by payload/header contentSwitch / FilterLow
Recipient ListDynamic multi-target dispatchIterate + dynamic endpointMedium
Guaranteed DeliveryReliable async processingMessage Store + ProcessorHigh (async)
SagaDistributed transactionsSequential Call + CompensateHigh (multi-step)
Event-DrivenDecoupled async communicationJMS Inbound + TopicsVaries

Combining Patterns

Real integrations often combine several patterns. Example: an order placed via API is validated (content-based routing to select a validator), stored for guaranteed delivery, processed through a saga, and the result published as an event.

<!-- Orchestration combining multiple patterns -->
<api name="FullOrderAPI" context="/v2/orders" xmlns="http://ws.apache.org/ns/synapse">
    <resource methods="POST">
        <inSequence>
            <!-- 1. Content-based validation -->
            <switch source="json-eval($.region)">
                <case regex="EU">
                    <call>
                        <endpoint key="EUValidatorEndpoint"/>
                    </call>
                </case>
                <case regex="US">
                    <call>
                        <endpoint key="USValidatorEndpoint"/>
                    </call>
                </case>
                <default>
                    <call>
                        <endpoint key="DefaultValidatorEndpoint"/>
                    </call>
                </default>
            </switch>

            <!-- 2. Check validation result -->
            <filter source="json-eval($.valid)" regex="true">
                <then>
                    <!-- 3. Guaranteed delivery: store and return 202 -->
                    <store messageStore="OrderStore"/>
                    <payloadFactory media-type="json">
                        <format>{"status": "accepted", "orderId": "$1"}</format>
                        <args>
                            <arg expression="json-eval($.orderId)"/>
                        </args>
                    </payloadFactory>
                    <property name="HTTP_SC" value="202" scope="axis2"/>
                    <respond/>
                </then>
                <else>
                    <payloadFactory media-type="json">
                        <format>{"status": "rejected", "reason": "$1"}</format>
                        <args>
                            <arg expression="json-eval($.reason)"/>
                        </args>
                    </payloadFactory>
                    <property name="HTTP_SC" value="422" scope="axis2"/>
                    <respond/>
                </else>
            </filter>
        </inSequence>
    </resource>
</api>

Key Takeaways

  • Scatter-Gather (Clone + Aggregate) parallelizes calls and merges results with configurable timeouts
  • Content-Based Routing uses Switch/Filter to direct messages to the correct backend
  • Guaranteed Delivery with Message Stores and Processors ensures no message loss during outages
  • The Saga pattern handles distributed transactions through sequential steps with compensating actions
  • Event-Driven Architecture uses JMS inbound endpoints, topics, and webhooks for decoupled communication
  • Patterns compose naturally: combine routing, guaranteed delivery, and sagas in a single flow
  • Always implement fault sequences and dead letter channels for production resilience

Next Steps

Continue to Chapter 13: Quick Reference for a cheat sheet of common configurations, CLI commands, and troubleshooting tips.