Chapter 4: WSO2 Integration
Enterprise Integration Patterns
WSO2 Enterprise Integrator (EI) and Micro Integrator (MI) implement Enterprise Integration Patterns (EIP) for connecting systems, applications, and data sources.
Common Integration Challenges
- Heterogeneous Systems: Different protocols, data formats, and interfaces
- Point-to-Point Complexity: Each integration creates dependencies
- Data Transformation: Converting between formats
- Routing Logic: Directing messages to correct destinations
- Error Handling: Managing failures and retries
Solution: Mediation Layer
System A ──┐
│
System B ──┼── WSO2 Integration ──┼── Database
│ Layer │
System C ──┘ └── Cloud Service
Synapse Mediation Engine
Core Concepts
Synapse is the underlying mediation engine that processes messages.
Key Components:
- Proxy Services: Entry points for messages
- APIs: RESTful service interfaces
- Sequences: Ordered set of mediators
- Mediators: Individual processing units
- Endpoints: Backend service definitions
- Message Stores: Persistent message storage
- Message Processors: Process stored messages
Message Flow
Request → Proxy/API → In Sequence → Endpoint → Out Sequence → Response
↓
Mediators
(Transform, Log, Filter, etc.)
Integration Artifacts
1. Proxy Services
SOAP/HTTP service proxies with mediation.
Example:
<proxy name="StockQuoteProxy" startOnLoad="true">
<target>
<inSequence>
<log level="full"/>
<send>
<endpoint>
<address uri="http://backend:9000/services/SimpleStockQuoteService"/>
</endpoint>
</send>
</inSequence>
<outSequence>
<send/>
</outSequence>
</target>
</proxy>
Use Cases:
- SOAP service mediation
- Protocol bridging
- Service virtualization
- Legacy system integration
2. REST APIs
RESTful API definitions with resources and methods.
Example:
<api name="HealthcareAPI" context="/healthcare">
<resource methods="GET" uri-template="/doctors">
<inSequence>
<log level="custom">
<property name="message" value="Get all doctors"/>
</log>
<call>
<endpoint>
<http method="get" uri-template="http://backend/api/doctors"/>
</endpoint>
</call>
<respond/>
</inSequence>
</resource>
<resource methods="POST" uri-template="/appointments">
<inSequence>
<property name="POST_TO_URI" value="true" scope="axis2"/>
<call>
<endpoint>
<http method="post" uri-template="http://backend/api/appointments"/>
</endpoint>
</call>
<respond/>
</inSequence>
</resource>
</api>
3. Sequences
Reusable mediation logic.
Example:
<sequence name="errorHandlingSequence">
<log level="custom">
<property name="STATUS" value="ERROR"/>
<property name="ERROR_MESSAGE" expression="get-property('ERROR_MESSAGE')"/>
</log>
<payloadFactory media-type="json">
<format>
{
"error": true,
"message": "$1"
}
</format>
<args>
<arg expression="get-property('ERROR_MESSAGE')"/>
</args>
</payloadFactory>
<property name="HTTP_SC" value="500" scope="axis2"/>
<respond/>
</sequence>
4. Endpoints
Backend service definitions.
Types:
Address Endpoint:
<endpoint name="SimpleEndpoint">
<address uri="http://backend:8080/service"/>
</endpoint>
HTTP Endpoint:
<endpoint name="RestEndpoint">
<http method="get" uri-template="http://backend/api/users/{id}"/>
</endpoint>
Load-Balance Endpoint:
<endpoint name="LoadBalancedEndpoint">
<loadbalance algorithm="roundRobin">
<endpoint>
<address uri="http://server1:8080/service"/>
</endpoint>
<endpoint>
<address uri="http://server2:8080/service"/>
</endpoint>
</loadbalance>
</endpoint>
Failover Endpoint:
<endpoint name="FailoverEndpoint">
<failover>
<endpoint>
<address uri="http://primary:8080/service"/>
</endpoint>
<endpoint>
<address uri="http://secondary:8080/service"/>
</endpoint>
</failover>
</endpoint>
Mediators
Core Mediators
1. Log Mediator
<log level="custom">
<property name="Message" value="Processing request"/>
<property name="User" expression="$ctx:user"/>
<property name="Timestamp" expression="get-property('SYSTEM_TIME')"/>
</log>
Log Levels:
full: Complete SOAP envelopesimple: Basic infoheaders: SOAP headers onlycustom: Specified properties
2. Property Mediator
<!-- Set property -->
<property name="userId" expression="json-eval($.user.id)" scope="default"/>
<!-- Use property -->
<property name="userIdValue" expression="$ctx:userId"/>
Property Scopes:
default: Message contextaxis2: Transport leveltransport: HTTP headersoperation: Operation context
3. PayloadFactory Mediator
Transform message payload.
<payloadFactory media-type="json">
<format>
{
"orderId": "$1",
"status": "$2",
"timestamp": "$3"
}
</format>
<args>
<arg expression="json-eval($.id)"/>
<arg value="CONFIRMED"/>
<arg expression="get-property('SYSTEM_TIME')"/>
</args>
</payloadFactory>
4. Call Mediator
Non-blocking service call.
<call>
<endpoint>
<http method="post" uri-template="http://backend/api/orders"/>
</endpoint>
</call>
5. Send Mediator
Send message to endpoint.
<send>
<endpoint key="BackendEndpoint"/>
</send>
6. Respond Mediator
Return response to client.
<respond/>
Transform Mediators
1. Data Mapper
Graphical data transformation.
<datamapper config="gov:datamapper/transform_config.dmc"
inputSchema="gov:datamapper/input_schema.json"
outputSchema="gov:datamapper/output_schema.json"/>
2. XSLT Mediator
XML transformations.
<xslt key="gov:xslt/transform.xsl">
<property name="param1" value="value1"/>
</xslt>
3. Script Mediator
Custom transformations using JavaScript, Groovy, etc.
<script language="js">
<![CDATA[
var payload = mc.getPayloadJSON();
payload.processed = true;
payload.timestamp = new Date().getTime();
mc.setPayloadJSON(payload);
]]>
</script>
Filter Mediators
1. Filter Mediator
Conditional processing.
<filter xpath="$ctx:status = 'VIP'">
<then>
<log>
<property name="message" value="VIP customer"/>
</log>
<call>
<endpoint key="VIPEndpoint"/>
</endpoint>
</then>
<else>
<call>
<endpoint key="RegularEndpoint"/>
</endpoint>
</else>
</filter>
2. Switch Mediator
Multiple condition routing.
<switch source="get-property('type')">
<case regex="URGENT">
<call>
<endpoint key="UrgentQueue"/>
</endpoint>
</case>
<case regex="NORMAL">
<call>
<endpoint key="NormalQueue"/>
</endpoint>
</case>
<default>
<call>
<endpoint key="DefaultQueue"/>
</endpoint>
</default>
</switch>
Integration Patterns
1. Content-Based Routing
Route messages based on content.
<api name="RoutingAPI" context="/route">
<resource methods="POST" uri-template="/order">
<inSequence>
<property name="orderType" expression="json-eval($.type)"/>
<switch source="get-property('orderType')">
<case regex="EXPRESS">
<call>
<endpoint>
<http uri-template="http://express-backend/orders"/>
</endpoint>
</call>
</case>
<case regex="STANDARD">
<call>
<endpoint>
<http uri-template="http://standard-backend/orders"/>
</endpoint>
</call>
</case>
</switch>
<respond/>
</inSequence>
</resource>
</api>
2. Message Enrichment
Add data from external sources.
<enrich>
<source type="inline" clone="true">
<customerInfo>
<id>12345</id>
</customerInfo>
</source>
<target type="body" action="child"/>
</enrich>
<!-- Or enrich from backend call -->
<call>
<endpoint>
<http method="get" uri-template="http://backend/customers/{id}"/>
</endpoint>
</call>
<enrich>
<source type="body" clone="true"/>
<target type="property" property="customerData"/>
</enrich>
3. Scatter-Gather
Call multiple services and aggregate results.
<api name="AggregationAPI" context="/aggregate">
<resource methods="GET" uri-template="/data">
<inSequence>
<!-- Clone message -->
<clone>
<target>
<sequence>
<call>
<endpoint>
<http uri-template="http://service1/data"/>
</endpoint>
</call>
<property name="service1Response" expression="json-eval($)"/>
</sequence>
</target>
<target>
<sequence>
<call>
<endpoint>
<http uri-template="http://service2/data"/>
</endpoint>
</call>
<property name="service2Response" expression="json-eval($)"/>
</sequence>
</target>
</clone>
<!-- Aggregate results -->
<aggregate>
<completeCondition>
<messageCount min="2" max="2"/>
</completeCondition>
<onComplete expression="json-eval($)">
<payloadFactory media-type="json">
<format>
{
"service1": $1,
"service2": $2
}
</format>
<args>
<arg expression="$ctx:service1Response"/>
<arg expression="$ctx:service2Response"/>
</args>
</payloadFactory>
<respond/>
</onComplete>
</aggregate>
</inSequence>
</resource>
</api>
4. Message Store and Forward
Guaranteed delivery with retry.
<!-- Message Store -->
<messageStore name="OrderStore" class="org.apache.synapse.message.store.impl.jms.JmsStore">
<parameter name="store.jms.destination">OrderQueue</parameter>
</messageStore>
<!-- Store messages -->
<store messageStore="OrderStore"/>
<!-- Message Processor -->
<messageProcessor
name="OrderProcessor"
class="org.apache.synapse.message.processor.impl.forwarder.ScheduledMessageForwardingProcessor"
messageStore="OrderStore">
<parameter name="interval">10000</parameter>
<parameter name="max.delivery.attempts">4</parameter>
<parameter name="target.endpoint">BackendEndpoint</parameter>
</messageProcessor>
5. Protocol Switching
Convert between protocols.
<!-- REST to SOAP -->
<api name="RestToSoap" context="/rest">
<resource methods="POST" uri-template="/convert">
<inSequence>
<property name="userId" expression="json-eval($.userId)"/>
<!-- Build SOAP envelope -->
<payloadFactory media-type="xml">
<format>
<soap:Envelope xmlns:soap="http://www.w3.org/2003/05/soap-envelope">
<soap:Body>
<getUserRequest>
<userId>$1</userId>
</getUserRequest>
</soap:Body>
</soap:Envelope>
</format>
<args>
<arg expression="$ctx:userId"/>
</args>
</payloadFactory>
<header name="Action" value="urn:getUser"/>
<property name="messageType" value="application/soap+xml" scope="axis2"/>
<call>
<endpoint>
<address uri="http://soap-backend/UserService"/>
</endpoint>
</call>
<respond/>
</inSequence>
</resource>
</api>
Connectors
Pre-built Connectors
WSO2 provides connectors for popular services and protocols.
Categories:
- Messaging: Kafka, RabbitMQ, JMS, MQTT
- Databases: MySQL, PostgreSQL, MongoDB, Cassandra
- Cloud: AWS, Azure, Google Cloud
- SaaS: Salesforce, ServiceNow, Gmail, Slack
- Social: Twitter, Facebook, LinkedIn
- Payment: PayPal, Stripe
- File Systems: SFTP, FTP, SMB
Using Connectors
Example: Kafka Connector:
<!-- Import connector -->
<dependency>
<connector>
<groupId>org.wso2.carbon.connector</groupId>
<artifactId>kafka</artifactId>
<version>3.0.0</version>
</connector>
</dependency>
<!-- Use connector -->
<kafkaTransport.init>
<bootstrapServers>localhost:9092</bootstrapServers>
<keySerializerClass>org.apache.kafka.common.serialization.StringSerializer</keySerializerClass>
<valueSerializerClass>org.apache.kafka.common.serialization.StringSerializer</valueSerializerClass>
</kafkaTransport.init>
<kafkaTransport.publishMessages>
<topic>orders</topic>
<key>order-123</key>
<value expression="json-eval($)"/>
</kafkaTransport.publishMessages>
Example: Salesforce Connector:
<salesforce.init>
<username>user@example.com</username>
<password>password</password>
<securityToken>token</securityToken>
<loginUrl>https://login.salesforce.com</loginUrl>
</salesforce.init>
<salesforce.create>
<sobject>Account</sobject>
<fieldAndValue>
{
"Name": "Acme Corp",
"Industry": "Technology"
}
</fieldAndValue>
</salesforce.create>
Error Handling
Fault Sequences
<api name="ErrorHandlingAPI" context="/api">
<resource methods="POST" uri-template="/process">
<inSequence>
<call>
<endpoint>
<address uri="http://backend/process"/>
</endpoint>
</call>
<respond/>
</inSequence>
<!-- Fault sequence for errors -->
<faultSequence>
<log level="custom">
<property name="ERROR_CODE" expression="get-property('ERROR_CODE')"/>
<property name="ERROR_MESSAGE" expression="get-property('ERROR_MESSAGE')"/>
</log>
<payloadFactory media-type="json">
<format>
{
"error": true,
"code": "$1",
"message": "$2"
}
</format>
<args>
<arg expression="get-property('ERROR_CODE')"/>
<arg expression="get-property('ERROR_MESSAGE')"/>
</args>
</payloadFactory>
<property name="HTTP_SC" value="500" scope="axis2"/>
<respond/>
</faultSequence>
</resource>
</api>
Try-Catch-Finally
<sequence name="ProcessingSequence">
<try>
<call>
<endpoint>
<address uri="http://backend/process"/>
</endpoint>
</call>
<property name="SUCCESS" value="true"/>
</try>
<catch>
<log level="custom">
<property name="ERROR" value="Call failed"/>
</log>
<property name="SUCCESS" value="false"/>
</catch>
<finally>
<log level="custom">
<property name="STATUS" expression="$ctx:SUCCESS"/>
</log>
</finally>
</sequence>
WSO2 Integration Studio
Development Tool
Integration Studio is the Eclipse-based IDE for developing integration solutions.
Key Features:
- Graphical design view
- Source code view
- Built-in templates
- Connector palette
- Debugging capabilities
- Docker/Kubernetes export
Creating a Project:
File → New → Integration Project
├── ESB Configs
├── Composite Exporter
├── Registry Resources
└── Connector Exporter
Deploying:
Right-click on Composite Exporter → Export Composite Application
→ Deploy to Micro Integrator
Key Takeaways
- WSO2 Integration implements Enterprise Integration Patterns
- Synapse is the core mediation engine
- Proxy Services and REST APIs are entry points
- Mediators perform transformations, routing, and processing
- Connectors provide pre-built integrations
- Message Store and Forward ensures guaranteed delivery
- Integration Studio is the development tool
Next Steps
Continue to Chapter 5: WSO2 Identity Server to learn about identity and access management.