Link to home
Start Free TrialLog in
Avatar of Molko
Molko

asked on

Apache Camel - Unable to propagate JMS Header Properties between Request - Response

I am trying to save a value on the Camel Exchange between a Request - Response invocation against a QPID endpoint.

You can see from my code that I set a Header (and Property) before i invoke the Endpoint. Upon return the same Header and Property Values are null.

I basically want to keep a track of the fileName and filePath so that I can write the results into the same location

Really struggling with this.

import org.apache.camel.builder.RouteBuilder;
import org.springframework.beans.factory.annotation.Value;

public class ProcessingRoute extends RouteBuilder {

    @Override
    public void configure() throws Exception {

    //@formatter:off
        from("file:/home/molko/in/?recursive=true&include=.*.txt")
            .log("File read from disk : ${file:name}")
            .doTry()
                .setHeader("JMSReplyTo", constant("response-1"; {create:always, node:{type:queue}}"))
                .setHeader("JMSCorrelationID", constant(java.util.UUID.randomUUID().toString()))

                .process(new Processor() {
                    @Override
                    public void process(Exchange exchange) throws Exception {

                        final String fileParent = exchange.getIn().getHeader("CamelFileParent", String.class);
                        final String endPath = fileParent.substring(fileParent.lastIndexOf('/') + 1);

                        exchange.getIn().setHeader("endPath", endPath);
                        exchange.setProperty("endPath", endPath);
                    }

                })                  

                .to(amqp:request-1;{node:{type:queue}}?preserveMessageQos=true?exchangePattern=InOut")
            .doCatch(Exception.class)
                .log("Failed : ${file:name}")
                .log("${exception.stacktrace}")
            .stop();

        from("amqp:response-1; {create:always, node:{type:queue}}")
            .log("Received from qpid broker : ${date:now}")
            .doTry()
                .process(new Processor() {
                    @Override
                    public void process(Exchange exchange) throws Exception {

                        byte[] response = exchange.getIn().getBody(byte[].class);

                        System.out.println("properties : " + exchange.getProperties());
                        System.out.println("headers : " + exchange.getIn().getHeaders());
                        }               
                    })              
                .to("file:/home/molko/out")
            .doCatch(Exception.class)
                .log("Failed from qpid brokre : ${date:now}")
                .log("${exception.stacktrace}")
            .stop();
    //@formatter:on
    }
}

Open in new window

ASKER CERTIFIED SOLUTION
Avatar of gurpsbassi
gurpsbassi
Flag of United Kingdom of Great Britain and Northern Ireland image

Link to home
membership
This solution is only available to members.
To access this solution, you must be a member of Experts Exchange.
Start Free Trial
Avatar of Molko
Molko

ASKER

Hi
Thanks for your comments.

The first route is consuming from disk and posting to qpid, the second route is consuming the results from qpid and persisting these results to disk
after setting the property, try  exchange.setOut(exchange.getIn());
so that the exchange gets preserved.