Monday, July 23, 2012

Custom CXF Transport



Abstract
Apache CXF is Web Services and REST framework designed in very extensible and flexible way. One very important aspect of CXF framework is transports. They are responsible for physical communication between clients and services. This paper describes how transports are organized in CXF. Paper consists from the two parts: first one gives a general overview of architecture and design of CXF transport layer, it also describes how to create own custom transport and what are the use cases for it. The second part concentrates on JMS transport and shows how to design scalable CXF applications using JMS.
 
Introduction
Presently the CXF distribution provides transport implementations for the following protocols: HTTP(S), JMS, JBI, and Local [REF-1]. HTTP(S) and JMS transports support corresponded protocols and interfaces; JBI transport provides communication with JBI service engines and binding components; Local transport is designed for optimized communication between participants in the same JVM. Apache Camel project additionally provides Camel transport for CXF [REF-2].
Normally creation of a new custom transport is required for a protocol not yet supported by CXF: UDP or FTP, for example. Of course, this case can be implemented using Camel based solution, but if it is not appropriate because of any reason – CXF custom transport is a valid alternative. New CXF transports can be also a solution for legacy ESB participants have to be implemented using standard JAX-WS interface, but should communicate using high level protocol based on old ESB (JBI transport is the example of such use case). Let analyse the CXF transport layer in details.

CXF Transport Layer

Architecture and Design
The transport functionality is based on two fundamental definitions: conduit and destination. Conduits are responsible for sending a message to recipients and destinations for receiving a message from the sender. In order to send a response, a destination needs its own back-channel conduit (in case of request-response communication). Conduits and destinations are created by a TransportFactory. CXF selects the correct TransportFactory based on the transport URL. SOAP is also considered a high level transport and has its own conduit and destination in CXF.
To send a message into a physical channel, the conduit should access the message content. As far as CXF is streaming oriented, normal practice in this case is to use a subclass of OutputStream extending CachedOutputStream. The custom stream will be fed the message and provides a possibility to access context in streaming or buffered form depending on the transport requirements. CachedOutputStream is configured to keep message in memory only up to a predefined size. If this size is exceeded, the message is swapped to disk.
A class diagram of TransportFactory, Conduit, Destination and OutputStream is shown below:



How it Works
Interaction between JAX-WS client and service using CXF transport is represented in the following figure:


What happens in transport layer on the client and on the service side by sending/receiving message? Let see it in details.

Client Workflow

    • Step1: JAX-WS client invokes a service, in this manner for example:
              URL wsdlURL = his.getClass().getResource("/HelloWorld.wsdl");
              HelloWorldService service = new HelloWorldService(wsdlURL, SERVICE_NAME);        
              HelloWorld hw = service.getHelloWorldPort();       
              String result = hw.sayHi(TEST_REQUEST);
      
        • Step 2: CXF runtime selects the correct TransportFactory based on some criteria (described below)
        • Step 3: CXF runtime calls TransportFactory.getConduit() method to obtain the conduit
        • Step 4: CXF runtime invokes Conduit.prepare() and passes outgoing message as argument
        • Step 5: Conduit sets up own OutputStream (normally extended CachedOutputStream) as outgoing message content
        • Step 6: CXF runtime processes outgoing message, calls the interceptor chain and writes outgoing message to conduit’s OutputStream stream. Messaging in CXF is stream-oriented; therefore the message normally is proceed and sent not as one bunch, but as a stream. The last bytes of the message can still be in processing, and the first one already sent to recipient. It is responsibility of Conduit how to send the message: using streaming or collecting the whole message and send it at once
        • Step 6: When CXF runtime completely proceeded outgoing message, it invokes Conduit.close(Message) method. It means that the message is completely written into OutputStream. Correspondingly, OutputStream.doClose() method will be called
        • Step 8: In the doClose() method, the OutputStream class has access to the whole marshalled outgoing message and exchange and will send this message to the service using the corresponding transport protocol. In case of streaming, the part of the message can be already sent to the network at this time, and Conduit just sends the last part and finishes the request sending
        • Step 9: In case of one-way communication exchange will be closed. Skip to Step 14
        • Step 10: In case of request-response communication, the conduit will wait for the service response in synchronous or asynchronous manner
        • Step 11: When response is received, the conduit creates a new message, sets its context and puts it as In-Message in the exchange as an incoming message. Content of new message is also available as a stream. Therefore runtime and business logic can start message processing even it still not be completely received.
        • Step 12: When fault is received, Conduit also creates a new Message, sets its context and puts it in exchange as in-fault message.
        • Step 13: Conduit notifies incomingObserver (that is ClientImpl object) about the response using incomingObserver.onMessage() call
        • Step 14: Conduit.close(Message) method is invoked for incoming message. Normally the conduit implementation decreases the reference count of current network connection, potentially closing it if the count is zero.
        • Step 15: JAX-WS client code receives the response in sync or async style

          Service Workflow
          • Step1: JAX-WS service is registered for example in this way:
               HelloWorldImpl serverImpl = new HelloWorldImpl();
               Endpoint.publish("udp://localhost:9000/hello", serverImpl);
          
          • Step 2: CXF runtime selects correct TransportFactory based on some criteria (described below)
          • Step 3: CXF runtime calls TransportFactory.getDestination() method to obtain the destination
          • Step 4: As soon as CXF runtime activates endpoint (adds listener, etc) Destination.activate() method is automatically invoked
          • Step 5: Implementation of Destination.activate() normally opens network connections and listens to incoming requests
          • Step 6: When a request comes, the destination creates a message, sets the content and notifies message observer (that is ChainInitializationObserver object) via incomingObserver.onMessage() about request. Message content is saved as a stream; therefore runtime and business logic can start processing even not completely received message. Normally an incoming connection is saved in a correlation map to be extracted for the sending of appropriate response.
          • Step 7: The business service implementation will be called with the request message in stream form. In case of one-way communication the exchange is now finished. In case of request-response, the business implementation either returns a response or throws a fault exception.
          • Step 8: The CXF Runtime requests a back-channel conduit from the destination via Destination.getInbuiltBackChannel()
          • Step 9: The Back-channel conduit's prepare() method will be called with a response message as argument
          • Step 10: Back-channel conduit sets its own OutputStream as a message context
          • Step 11: CXF runtime processes the response message, calls the interceptor chain and invokes Conduit.close(Message) for the response message.
          • Step 12. Finally OutputStream.doClose() method for the response message is invoked
          • Step 13: In doClose() method the OutputStream class has access to the marshalled response message and will send this message through the network as a response to the client. In case of streaming, the part of the message can be already sent to the network at this time, and Conduit just sends the last part and closes the sending. Normally incoming connection for specified protocol is cached and created only if necessary.

          Registration of Transport Factory
          There are two ways to register transport factory: programmatically or via Spring configuration.
          To register transport factory programmatically it is necessary to execute the following code:
          CustomTransportFactory transportFactory = new CustomTransportFactory(); 
          Bus bus = BusFactory.getThreadDefaultBus();     
          DestinationFactoryManagerImpl dfm = bus.getExtension(DestinationFactoryManagerImpl.class);
          dfm.registerDestinationFactory(TRANSPORT_IDENTIFIER, transportFactory); 
          ConduitInitiatorManager extension = bus.getExtension(ConduitInitiatorManager.class);      
          extension.registerConduitInitiator(TRANSPORT_IDENTIFIER, transportFactory);
          TRANSPORT_IDENTIFIER is unique transport id (normally in form "http://apache.org/transports/PROTOCOL_PREFIX").

          For Spring configuration, the following could be used instead:

          <bean class="org.company.cxf.transport.CustomTransportFactory"
            lazy-init="false">
            <property name="transportIds">
             <list>
                  <value>TRANSPORT_IDENTIFIER</value>
             </list>
            </property>
           </bean>

          How CXF chooses the TransportFactory
          The TransportFactory is now registered, and CXF participant will send or receive the message. How CXF finds the correct TransportFactory to do it?


          It will be fulfilled in two steps:

          1.
          Binding TransportFactory selection
          CXF interprets bindings like SOAP as high level transport and also choose appropriate
          TransportFactory for it. TransportFactory provides list of Transport IDs in method TransportFactory.getTransportIds(). As far as this list contains value of binding transport attribute and binding namespace defined in WSDL document, CXF will select this TransportFactory:
          WSDL:
          <wsdl:definitions xmlns:soap="http://schemas.xmlsoap.org/wsdl/soap/" …>
              …
              …
              <wsdl:binding name="GreeterPortBinding" type="tns: GreeterPortType">
                  <soap:binding style="document"
                      transport="http://schemas.xmlsoap.org/soap/http" />
                  …
                  <wsdl:service name="GreeterService">
                      <wsdl:port binding="tns:GreeterPortBinding" name="GreeterPort">
                          <transport:address location="LOCATION_URL">

          TransportFactory class:
            …
          public static final List
          <String>
              DEFAULT_ NAMESPACES = Arrays.asList(
              "http://schemas.xmlsoap.org/soap/",
              "http://schemas.xmlsoap.org/wsdl/soap/",
              "http://schemas.xmlsoap.org/wsdl/soap12/",
              "http://schemas.xmlsoap.org/soap/http/",
              "http://schemas.xmlsoap.org/ws dl/soap/http",
              "http://www.w3.org/2010/soapjms/",
              "http://www.w3.org/2003/05/soap/bindings/HTTP/",
              "http://schemas.xmlsoap.org/soap/http");
              public final List
              <String> getTransportIds() {
                  return DEFAULT_NAMESP ACES;
                  }

          2. Protocol TransportFactory selection
          As far as binding TransportFactory is found, CXF looking for protocol TransportFactory responsible for physical network communication. In this case important is method TransportFactory.getUriPrefixes(). This method returns list of protocol prefixes supported by this TransportFactory.
          When CXF client or service try to communicate using URL with specified protocol prefix (http://, https://, jms://, local://), CXF looks into registered transport factories map and gets the right one for this prefix. If no TransportFactory for this protocol is found, CXF throws corresponded exception. 
          Client configuration:
          <jaxws:client id="FlightReservationClient"
              xmlns:serviceNamespace="http://www.apache.org/cxf/samples/FlightReservation"
              serviceClass="org.apache.cxf.samples.flightreservation.FlightReservation"
              serviceName="serviceNamespace:FlightReservationService"
              endpointName="serviceNamespace:FlightReservationSOAP">
              address="http://localhost:8040/services/FlightReservationService">
          </jaxws:client>…

          TransportFactory class:
          …
          private static final Set
          <String>
              URI_PREFIXES = new HashSet
              <String>
                  ();
                  static {
                  URI_PREFIXES.add("http://");
                  URI_PREFIXES.add("https:");
                  }
                  public Set
                  <String> getUriPrefixes() {
                      return URI_PREFIXES;
                      }
          Conduit and Destination Lifecycle
          Destinations are normally created by service on startup and released by shutdown. Conduits can be either recreated for each request or cached based on endpoint information for whole client life time. Clients can make concurrent calls to endpoints using different protocols and bound them to different conduits.

          Concurrency Aspects
          Conduit and destination objects can by concurrently accessed by multiple threads. Implementations should care about thread safety of the class.

          Streaming
          It is strongly recommended to don’t break streaming in Conduit and Destination implementations, if physical protocol supports it. CXF is completely streaming oriented – it causes high performance and scalability.

          How to start
          What is the start point to understand the CXF transport layer and implement own transport? I would recommend to read CXF documentation [REF-1] and analyse source code of existing CXF transports (Local and JMS once are more straightforward). They are located into packages: org.apache.cxf.transport.local and org.apache.cxf.transport.jms correspondingly.
           
          Conclusion
          CXF provides very flexible and pluggable transports layer. It is possible to configure standard transport implementations delivered with CXF as well as implement and integrate a new custom transport. Creation of custom transport in CXF is straightforward.
          Some Apache projects can be easily integrated using CXF transports: Camel provides the possibility to bind CXF participants directly to Camel route; JBI transport simplifies communication with ServiceMix applications.

          References
          REF-1: CXF transports overview http://cxf.apache.org/docs/transports.html  
          REF-2: Camel CXF transport http://camel.apache.org/camel-transport-for-cxf.html