Chapter 4: WSO2 Integration

Enterprise Integration Patterns

WSO2 Enterprise Integrator (EI) and Micro Integrator (MI) implement Enterprise Integration Patterns (EIP) for connecting systems, applications, and data sources.

Common Integration Challenges

  1. Heterogeneous Systems: Different protocols, data formats, and interfaces
  2. Point-to-Point Complexity: Each integration creates dependencies
  3. Data Transformation: Converting between formats
  4. Routing Logic: Directing messages to correct destinations
  5. Error Handling: Managing failures and retries

Solution: Mediation Layer

System A ──┐
           │
System B ──┼── WSO2 Integration ──┼── Database
           │       Layer           │
System C ──┘                       └── Cloud Service

Synapse Mediation Engine

Core Concepts

Synapse is the underlying mediation engine that processes messages.

Key Components:

  • Proxy Services: Entry points for messages
  • APIs: RESTful service interfaces
  • Sequences: Ordered set of mediators
  • Mediators: Individual processing units
  • Endpoints: Backend service definitions
  • Message Stores: Persistent message storage
  • Message Processors: Process stored messages

Message Flow

Request → Proxy/API → In Sequence → Endpoint → Out Sequence → Response
                          ↓
                      Mediators
                    (Transform, Log, Filter, etc.)

Integration Artifacts

1. Proxy Services

SOAP/HTTP service proxies with mediation.

Example:

<proxy name="StockQuoteProxy" startOnLoad="true">
  <target>
    <inSequence>
      <log level="full"/>
      <send>
        <endpoint>
          <address uri="http://backend:9000/services/SimpleStockQuoteService"/>
        </endpoint>
      </send>
    </inSequence>
    <outSequence>
      <send/>
    </outSequence>
  </target>
</proxy>

Use Cases:

  • SOAP service mediation
  • Protocol bridging
  • Service virtualization
  • Legacy system integration

2. REST APIs

RESTful API definitions with resources and methods.

Example:

<api name="HealthcareAPI" context="/healthcare">
  <resource methods="GET" uri-template="/doctors">
    <inSequence>
      <log level="custom">
        <property name="message" value="Get all doctors"/>
      </log>
      <call>
        <endpoint>
          <http method="get" uri-template="http://backend/api/doctors"/>
        </endpoint>
      </call>
      <respond/>
    </inSequence>
  </resource>
  
  <resource methods="POST" uri-template="/appointments">
    <inSequence>
      <property name="POST_TO_URI" value="true" scope="axis2"/>
      <call>
        <endpoint>
          <http method="post" uri-template="http://backend/api/appointments"/>
        </endpoint>
      </call>
      <respond/>
    </inSequence>
  </resource>
</api>

3. Sequences

Reusable mediation logic.

Example:

<sequence name="errorHandlingSequence">
  <log level="custom">
    <property name="STATUS" value="ERROR"/>
    <property name="ERROR_MESSAGE" expression="get-property('ERROR_MESSAGE')"/>
  </log>
  <payloadFactory media-type="json">
    <format>
      {
        "error": true,
        "message": "$1"
      }
    </format>
    <args>
      <arg expression="get-property('ERROR_MESSAGE')"/>
    </args>
  </payloadFactory>
  <property name="HTTP_SC" value="500" scope="axis2"/>
  <respond/>
</sequence>

4. Endpoints

Backend service definitions.

Types:

Address Endpoint:

<endpoint name="SimpleEndpoint">
  <address uri="http://backend:8080/service"/>
</endpoint>

HTTP Endpoint:

<endpoint name="RestEndpoint">
  <http method="get" uri-template="http://backend/api/users/{id}"/>
</endpoint>

Load-Balance Endpoint:

<endpoint name="LoadBalancedEndpoint">
  <loadbalance algorithm="roundRobin">
    <endpoint>
      <address uri="http://server1:8080/service"/>
    </endpoint>
    <endpoint>
      <address uri="http://server2:8080/service"/>
    </endpoint>
  </loadbalance>
</endpoint>

Failover Endpoint:

<endpoint name="FailoverEndpoint">
  <failover>
    <endpoint>
      <address uri="http://primary:8080/service"/>
    </endpoint>
    <endpoint>
      <address uri="http://secondary:8080/service"/>
    </endpoint>
  </failover>
</endpoint>

Mediators

Core Mediators

1. Log Mediator

<log level="custom">
  <property name="Message" value="Processing request"/>
  <property name="User" expression="$ctx:user"/>
  <property name="Timestamp" expression="get-property('SYSTEM_TIME')"/>
</log>

Log Levels:

  • full: Complete SOAP envelope
  • simple: Basic info
  • headers: SOAP headers only
  • custom: Specified properties

2. Property Mediator

<!-- Set property -->
<property name="userId" expression="json-eval($.user.id)" scope="default"/>

<!-- Use property -->
<property name="userIdValue" expression="$ctx:userId"/>

Property Scopes:

  • default: Message context
  • axis2: Transport level
  • transport: HTTP headers
  • operation: Operation context

3. PayloadFactory Mediator

Transform message payload.

<payloadFactory media-type="json">
  <format>
    {
      "orderId": "$1",
      "status": "$2",
      "timestamp": "$3"
    }
  </format>
  <args>
    <arg expression="json-eval($.id)"/>
    <arg value="CONFIRMED"/>
    <arg expression="get-property('SYSTEM_TIME')"/>
  </args>
</payloadFactory>

4. Call Mediator

Non-blocking service call.

<call>
  <endpoint>
    <http method="post" uri-template="http://backend/api/orders"/>
  </endpoint>
</call>

5. Send Mediator

Send message to endpoint.

<send>
  <endpoint key="BackendEndpoint"/>
</send>

6. Respond Mediator

Return response to client.

<respond/>

Transform Mediators

1. Data Mapper

Graphical data transformation.

<datamapper config="gov:datamapper/transform_config.dmc" 
            inputSchema="gov:datamapper/input_schema.json" 
            outputSchema="gov:datamapper/output_schema.json"/>

2. XSLT Mediator

XML transformations.

<xslt key="gov:xslt/transform.xsl">
  <property name="param1" value="value1"/>
</xslt>

3. Script Mediator

Custom transformations using JavaScript, Groovy, etc.

<script language="js">
<![CDATA[
  var payload = mc.getPayloadJSON();
  payload.processed = true;
  payload.timestamp = new Date().getTime();
  mc.setPayloadJSON(payload);
]]>
</script>

Filter Mediators

1. Filter Mediator

Conditional processing.

<filter xpath="$ctx:status = 'VIP'">
  <then>
    <log>
      <property name="message" value="VIP customer"/>
    </log>
    <call>
      <endpoint key="VIPEndpoint"/>
    </endpoint>
  </then>
  <else>
    <call>
      <endpoint key="RegularEndpoint"/>
    </endpoint>
  </else>
</filter>

2. Switch Mediator

Multiple condition routing.

<switch source="get-property('type')">
  <case regex="URGENT">
    <call>
      <endpoint key="UrgentQueue"/>
    </endpoint>
  </case>
  <case regex="NORMAL">
    <call>
      <endpoint key="NormalQueue"/>
    </endpoint>
  </case>
  <default>
    <call>
      <endpoint key="DefaultQueue"/>
    </endpoint>
  </default>
</switch>

Integration Patterns

1. Content-Based Routing

Route messages based on content.

<api name="RoutingAPI" context="/route">
  <resource methods="POST" uri-template="/order">
    <inSequence>
      <property name="orderType" expression="json-eval($.type)"/>
      <switch source="get-property('orderType')">
        <case regex="EXPRESS">
          <call>
            <endpoint>
              <http uri-template="http://express-backend/orders"/>
            </endpoint>
          </call>
        </case>
        <case regex="STANDARD">
          <call>
            <endpoint>
              <http uri-template="http://standard-backend/orders"/>
            </endpoint>
          </call>
        </case>
      </switch>
      <respond/>
    </inSequence>
  </resource>
</api>

2. Message Enrichment

Add data from external sources.

<enrich>
  <source type="inline" clone="true">
    <customerInfo>
      <id>12345</id>
    </customerInfo>
  </source>
  <target type="body" action="child"/>
</enrich>

<!-- Or enrich from backend call -->
<call>
  <endpoint>
    <http method="get" uri-template="http://backend/customers/{id}"/>
  </endpoint>
</call>
<enrich>
  <source type="body" clone="true"/>
  <target type="property" property="customerData"/>
</enrich>

3. Scatter-Gather

Call multiple services and aggregate results.

<api name="AggregationAPI" context="/aggregate">
  <resource methods="GET" uri-template="/data">
    <inSequence>
      <!-- Clone message -->
      <clone>
        <target>
          <sequence>
            <call>
              <endpoint>
                <http uri-template="http://service1/data"/>
              </endpoint>
            </call>
            <property name="service1Response" expression="json-eval($)"/>
          </sequence>
        </target>
        <target>
          <sequence>
            <call>
              <endpoint>
                <http uri-template="http://service2/data"/>
              </endpoint>
            </call>
            <property name="service2Response" expression="json-eval($)"/>
          </sequence>
        </target>
      </clone>
      
      <!-- Aggregate results -->
      <aggregate>
        <completeCondition>
          <messageCount min="2" max="2"/>
        </completeCondition>
        <onComplete expression="json-eval($)">
          <payloadFactory media-type="json">
            <format>
              {
                "service1": $1,
                "service2": $2
              }
            </format>
            <args>
              <arg expression="$ctx:service1Response"/>
              <arg expression="$ctx:service2Response"/>
            </args>
          </payloadFactory>
          <respond/>
        </onComplete>
      </aggregate>
    </inSequence>
  </resource>
</api>

4. Message Store and Forward

Guaranteed delivery with retry.

<!-- Message Store -->
<messageStore name="OrderStore" class="org.apache.synapse.message.store.impl.jms.JmsStore">
  <parameter name="store.jms.destination">OrderQueue</parameter>
</messageStore>

<!-- Store messages -->
<store messageStore="OrderStore"/>

<!-- Message Processor -->
<messageProcessor 
    name="OrderProcessor"
    class="org.apache.synapse.message.processor.impl.forwarder.ScheduledMessageForwardingProcessor"
    messageStore="OrderStore">
  <parameter name="interval">10000</parameter>
  <parameter name="max.delivery.attempts">4</parameter>
  <parameter name="target.endpoint">BackendEndpoint</parameter>
</messageProcessor>

5. Protocol Switching

Convert between protocols.

<!-- REST to SOAP -->
<api name="RestToSoap" context="/rest">
  <resource methods="POST" uri-template="/convert">
    <inSequence>
      <property name="userId" expression="json-eval($.userId)"/>
      
      <!-- Build SOAP envelope -->
      <payloadFactory media-type="xml">
        <format>
          <soap:Envelope xmlns:soap="http://www.w3.org/2003/05/soap-envelope">
            <soap:Body>
              <getUserRequest>
                <userId>$1</userId>
              </getUserRequest>
            </soap:Body>
          </soap:Envelope>
        </format>
        <args>
          <arg expression="$ctx:userId"/>
        </args>
      </payloadFactory>
      
      <header name="Action" value="urn:getUser"/>
      <property name="messageType" value="application/soap+xml" scope="axis2"/>
      
      <call>
        <endpoint>
          <address uri="http://soap-backend/UserService"/>
        </endpoint>
      </call>
      <respond/>
    </inSequence>
  </resource>
</api>

Connectors

Pre-built Connectors

WSO2 provides connectors for popular services and protocols.

Categories:

  • Messaging: Kafka, RabbitMQ, JMS, MQTT
  • Databases: MySQL, PostgreSQL, MongoDB, Cassandra
  • Cloud: AWS, Azure, Google Cloud
  • SaaS: Salesforce, ServiceNow, Gmail, Slack
  • Social: Twitter, Facebook, LinkedIn
  • Payment: PayPal, Stripe
  • File Systems: SFTP, FTP, SMB

Using Connectors

Example: Kafka Connector:

<!-- Import connector -->
<dependency>
  <connector>
    <groupId>org.wso2.carbon.connector</groupId>
    <artifactId>kafka</artifactId>
    <version>3.0.0</version>
  </connector>
</dependency>

<!-- Use connector -->
<kafkaTransport.init>
  <bootstrapServers>localhost:9092</bootstrapServers>
  <keySerializerClass>org.apache.kafka.common.serialization.StringSerializer</keySerializerClass>
  <valueSerializerClass>org.apache.kafka.common.serialization.StringSerializer</valueSerializerClass>
</kafkaTransport.init>

<kafkaTransport.publishMessages>
  <topic>orders</topic>
  <key>order-123</key>
  <value expression="json-eval($)"/>
</kafkaTransport.publishMessages>

Example: Salesforce Connector:

<salesforce.init>
  <username>user@example.com</username>
  <password>password</password>
  <securityToken>token</securityToken>
  <loginUrl>https://login.salesforce.com</loginUrl>
</salesforce.init>

<salesforce.create>
  <sobject>Account</sobject>
  <fieldAndValue>
    {
      "Name": "Acme Corp",
      "Industry": "Technology"
    }
  </fieldAndValue>
</salesforce.create>

Error Handling

Fault Sequences

<api name="ErrorHandlingAPI" context="/api">
  <resource methods="POST" uri-template="/process">
    <inSequence>
      <call>
        <endpoint>
          <address uri="http://backend/process"/>
        </endpoint>
      </call>
      <respond/>
    </inSequence>
    
    <!-- Fault sequence for errors -->
    <faultSequence>
      <log level="custom">
        <property name="ERROR_CODE" expression="get-property('ERROR_CODE')"/>
        <property name="ERROR_MESSAGE" expression="get-property('ERROR_MESSAGE')"/>
      </log>
      
      <payloadFactory media-type="json">
        <format>
          {
            "error": true,
            "code": "$1",
            "message": "$2"
          }
        </format>
        <args>
          <arg expression="get-property('ERROR_CODE')"/>
          <arg expression="get-property('ERROR_MESSAGE')"/>
        </args>
      </payloadFactory>
      
      <property name="HTTP_SC" value="500" scope="axis2"/>
      <respond/>
    </faultSequence>
  </resource>
</api>

Try-Catch-Finally

<sequence name="ProcessingSequence">
  <try>
    <call>
      <endpoint>
        <address uri="http://backend/process"/>
      </endpoint>
    </call>
    <property name="SUCCESS" value="true"/>
  </try>
  
  <catch>
    <log level="custom">
      <property name="ERROR" value="Call failed"/>
    </log>
    <property name="SUCCESS" value="false"/>
  </catch>
  
  <finally>
    <log level="custom">
      <property name="STATUS" expression="$ctx:SUCCESS"/>
    </log>
  </finally>
</sequence>

WSO2 Integration Studio

Development Tool

Integration Studio is the Eclipse-based IDE for developing integration solutions.

Key Features:

  • Graphical design view
  • Source code view
  • Built-in templates
  • Connector palette
  • Debugging capabilities
  • Docker/Kubernetes export

Creating a Project:

File → New → Integration Project
  ├── ESB Configs
  ├── Composite Exporter
  ├── Registry Resources
  └── Connector Exporter

Deploying:

Right-click on Composite Exporter → Export Composite Application
→ Deploy to Micro Integrator

Key Takeaways

  • WSO2 Integration implements Enterprise Integration Patterns
  • Synapse is the core mediation engine
  • Proxy Services and REST APIs are entry points
  • Mediators perform transformations, routing, and processing
  • Connectors provide pre-built integrations
  • Message Store and Forward ensures guaranteed delivery
  • Integration Studio is the development tool

Next Steps

Continue to Chapter 5: WSO2 Identity Server to learn about identity and access management.