camel-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Chris <cwolf.a...@gmail.com>
Subject Re: AW: How to set a header in custom DataFormat?
Date Fri, 20 Sep 2013 16:38:22 GMT
I spoke too soon... It works for unmarshalling, but not for marshalling 
- it's impossible to set new headers, attachments, etc. from a 
DataFormat's marshal method.

On 9/20/2013 11:57 AM, Chris wrote:
> Ok, I solved the issue.  After reading the source code of
> org.apache.camel.processor.UnmarshalProcessor, I saw this little piece
> of code:
>
>              } else if (result instanceof Message) {
>                  // the dataformat has probably set headers,
> attachments, etc. so let's use it as the outbound payload
>                  exchange.setOut((Message) result);
>
> Which means if a DataFormat implementation wants to set headers and/or
> attachments, the unmarshal implementation cannot directly return the
> results - it must create a new Message and set the results to body of
> the new Message and return this new Message instead.
>
> I have to say, I never saw this documented anywhere nor any example in
> the book.  Of course I understand that there are a lot of higher
> priority items to do, so I'm not complaining ;)
>
> Well here's the revised sample DataFormat where you *can* successfully
> set new headers in the unmarshal implementation:
>
> Regards,
>
> Chris
>
>
> class CustomDataFormat2 implements DataFormat {
>
>      @Override
>      public Object unmarshal(Exchange exchange, InputStream stream)
>              throws Exception {
>
>          // To be able to set new headers and/or attachments, etc.
>          // we need to wrap the result in a new Message and return
>          // the new Message instead... who knew?
>          Message msg = exchange.getIn().copy();
>
>          List<String> result = new ArrayList<String>();
>
>          BufferedReader in
>              = new BufferedReader(new InputStreamReader(stream));
>
>          String line = null;
>          while((line = in.readLine()) != null) {
>              result.add(line);
>          }
>
>          msg.setBody(result);
>
>          // now it works!
>          msg.setHeader("DISAPPEARING_DEMO_HEADER", "Will it be set?");
>          return msg;
>      }
>
>      @Override
>      public void marshal(Exchange exchange, Object graph, OutputStream
> stream)
>              throws Exception {
>          throw new UnsupportedOperationException("Not implemented.");
>      }
> }
>
>
> On 9/20/2013 10:55 AM, Chris wrote:
>> Jan,
>>
>> I really appreciate the help, unfortunately your test case does not even
>> reproduce the issue I am having.  I think I may not have been clear in
>> my original message.
>>
>> The issue is in a custom DataFormat, if a set a *new* header, that
>> header will be gone from the Exchange, for the rest of the route,
>> down-stream from the unmarshal (or marshal) call.
>>
>> So the issue is just setting a header and checking it it's still there.
>> So the best way to recreate the problem is to create the most simplest
>> DataFormat because marshal/unmarshal is not the issue - the issue is
>> setting a new header *inside* the custom marshal/unmarshal methods. So I
>> created a test case whose custom DataFormat does the bare minimum - no
>> file I/O, no serialization - just string manipulation.
>>
>> Maybe you and/or other can have a look and explain why there's the issue.
>>
>> Thanks,
>>
>> Chris
>>
>> (the code formatting will be wrecked by the mailing list line length
>> limit)
>>
>> import java.io.BufferedReader;
>> import java.io.InputStream;
>> import java.io.InputStreamReader;
>> import java.io.OutputStream;
>> import java.util.ArrayList;
>> import java.util.List;
>>
>>
>> import org.apache.camel.Exchange;
>> import org.apache.camel.builder.RouteBuilder;
>> import org.apache.camel.component.mock.MockEndpoint;
>> import org.apache.camel.impl.JndiRegistry;
>> import org.apache.camel.spi.DataFormat;
>> import org.apache.camel.test.junit4.CamelTestSupport;
>> import org.junit.Test;
>>
>>
>> public class SetHeaderDemo extends CamelTestSupport {
>>
>>      @Test
>>      public void setHeaderInDataFormatProblem() throws Exception {
>>          MockEndpoint mock = getMockEndpoint("mock:result");
>>          mock.expectedMessageCount(1);
>>          template.sendBodyAndHeader("direct:start", "one\ntwo\nthree\n",
>> "DEMO_HEADER", "Hello...");
>>
>>          // This passes
>>          assertExpression(mock.getReceivedExchanges().get(0),
>>                  "simple", "${in.header.DEMO_HEADER}", "Hello...");
>>
>>          // This FAILS... WHY??? *************
>>          assertExpression(mock.getReceivedExchanges().get(0),
>>              "simple", "${in.header.DISAPPEARING_DEMO_HEADER}", "Will it
>> be set?");
>>
>>          assertMockEndpointsSatisfied();
>>      }
>>
>>
>>      @Override
>>      protected RouteBuilder createRouteBuilder() throws Exception {
>>
>>          return new RouteBuilder() {
>>              @Override
>>              public void configure() {
>>                  from("direct:start")
>>                  .unmarshal("customFmt") // <== will try to add a new
>> header to Exchange IN msg
>>
>> .to("log://dataformat.demo?showAll=true&multiline=true&level=INFO") //
>> <== not there
>>                  .to("mock:result");
>>              }
>>          };
>>      }
>>
>>      @Override
>>      protected JndiRegistry createRegistry() throws Exception {
>>          JndiRegistry registry = super.createRegistry();
>>
>>          CustomDataFormat fmt = new CustomDataFormat();
>>          registry.bind("customFmt", fmt);
>>          return registry;
>>      }
>> }
>>
>> /**
>>   * Simplest, contrived DataFormat impl to demonstrate that it's
>> impossible
>>   * to set a new header in the DataFormat's implementation methods and
>>   * see the newly added header down-stream from the marshal/unmarshal
>>   * call(s).
>>   *
>>   */
>> class CustomDataFormat implements DataFormat {
>>
>>      /**
>>       * Expects the body to be a newline-delimited list of strings,
>>       * which will be unmarshalled to a string array, whose elements
>>       * are the "lines" in the "document".
>>       *
>>       * Obviously the marshal/unmarshal process is not important - the
>>       * issue is that if a new header is added in the DataFormat
>> marshal or
>>       * unmarshal - it will be GONE after returning.
>>       */
>>      @Override
>>      public Object unmarshal(Exchange exchange, InputStream stream)
>>              throws Exception {
>>          List<String> result = new ArrayList<String>();
>>
>>          BufferedReader in
>>              = new BufferedReader(new InputStreamReader(stream));
>>
>>          String line = null;
>>          while((line = in.readLine()) != null) {
>>              result.add(line);
>>          }
>>
>>          exchange.getIn().setHeader("DISAPPEARING_DEMO_HEADER", "Will it
>> be set?");
>>          return result;
>>      }
>>
>>      @Override
>>      public void marshal(Exchange exchange, Object graph, OutputStream
>> stream)
>>              throws Exception {
>>          throw new UnsupportedOperationException("Not implemented.");
>>      }
>> }
>>
>>
>>
>>
>> On 9/19/2013 4:13 AM, Jan Matèrne (jhm) wrote:
>>> I tried building my own DF and that works (for me)
>>>
>>> Jan
>>>
>>>
>>> package org.apache.camel.dataformat;
>>>
>>> import java.io.File;
>>> import java.io.FileInputStream;
>>> import java.io.FileNotFoundException;
>>> import java.io.FileOutputStream;
>>> import java.io.InputStream;
>>> import java.io.ObjectInputStream;
>>> import java.io.ObjectOutputStream;
>>> import java.io.OutputStream;
>>> import java.util.HashMap;
>>> import java.util.Map;
>>>
>>> import javax.activation.DataHandler;
>>>
>>> import org.apache.camel.Exchange;
>>> import org.apache.camel.Message;
>>> import org.apache.camel.Processor;
>>> import org.apache.camel.builder.ExchangeBuilder;
>>> import org.apache.camel.builder.RouteBuilder;
>>> import org.apache.camel.impl.DefaultMessage;
>>> import org.apache.camel.spi.DataFormat;
>>> import org.apache.camel.test.junit4.CamelTestSupport;
>>> import org.apache.camel.util.FileUtil;
>>> import org.apache.commons.io.IOUtils;
>>> import org.junit.After;
>>> import org.junit.Before;
>>> import org.junit.Test;
>>>
>>> public class SaveHeaderTest extends CamelTestSupport {
>>>
>>>     private static File dataFile = new File("output/message.ser");
>>>
>>>     DataFormat customDataFormat = new CustomDataFormat();
>>>
>>>     @After
>>>     @Before
>>>     public void cleanup() {
>>>         FileUtil.deleteFile(dataFile);
>>>     }
>>>
>>>     @SuppressWarnings("unchecked")
>>>     @Test
>>>     public void save() throws FileNotFoundException, Exception {
>>>         File writtenTo = new File("output/message.ser");
>>>         assertFalse(writtenTo.exists());
>>>         Exchange exchange = ExchangeBuilder.anExchange(context)
>>>                 .withBody("Hello World")
>>>                 .withHeader("from", "Apache Camel")
>>>                 .withHeader("test", "save")
>>>                 .withHeader(Exchange.FILE_NAME,
>>> "message.ser")
>>>                 .build();
>>>         template.send("direct:save", exchange);
>>>         assertTrue(writtenTo.exists());
>>>
>>>         // actual
>>>         ObjectInputStream in = new ObjectInputStream(new
>>> FileInputStream(dataFile));
>>>         Object body = in.readObject();
>>>         Map<String,Object> headers = (Map<String, Object>)
>>> in.readObject();
>>>         Map<String,DataHandler> attachments = (Map<String,
>>> DataHandler>) in.readObject();
>>>         String messageId = (String) in.readObject();
>>>         boolean fault = (boolean) in.readObject();
>>>
>>>         assertEquals("Hello World", body);
>>>         assertEquals("Apache Camel", headers.get("from"));
>>>         assertEquals("save", headers.get("test"));
>>>         assertTrue(attachments.isEmpty());
>>>         assertEquals(exchange.getIn().getMessageId(), messageId);
>>>         assertFalse(fault);
>>>     }
>>>
>>>     @Test
>>>     public void load() throws Exception {
>>>         // create test data
>>>         ObjectOutputStream out = new ObjectOutputStream(new
>>> FileOutputStream(dataFile));
>>>         out.writeObject("Hello World");
>>>
>>>         Map<String,Object> headers = new HashMap<String, Object>();
>>>         headers.put("from", "Apache Camel");
>>>         headers.put("test", "load");
>>>         out.writeObject(headers);
>>>
>>>         Map<String,DataHandler> attachments = new
>>> HashMap<String,DataHandler>();
>>>         out.writeObject(attachments);
>>>
>>>         out.writeObject("message-id");
>>>         out.writeObject(false);
>>>         out.close();
>>>
>>>         // load the saved 'exchange'
>>>         Exchange loadCommand = ExchangeBuilder.anExchange(context)
>>>                 .withBody(dataFile.getAbsolutePath())
>>>                 .build();
>>>         Exchange response = template.send("direct:load",
>>> loadCommand);
>>>         assertEquals("Hello World",
>>> response.getOut().getBody(String.class));
>>>         assertEquals("Apache Camel",
>>> response.getOut().getHeader("from"));
>>>         assertEquals("load", response.getOut().getHeader("test"));
>>>     }
>>>
>>>     @Test
>>>     public void turnaround() {
>>>         Exchange exchange = ExchangeBuilder.anExchange(context)
>>>                 .withBody("Hello World")
>>>                 .withHeader("from", "Apache Camel")
>>>                 .build();
>>>         Exchange response = template.send("direct:turnaround",
>>> exchange);
>>>         assertEquals("Hello World",
>>> response.getOut().getBody(String.class));
>>>         assertEquals("Apache Camel",
>>> response.getOut().getHeader("from"));
>>>     }
>>>
>>>
>>>     @Override
>>>     protected RouteBuilder createRouteBuilder() throws Exception {
>>>         return new RouteBuilder() {
>>>             @Override
>>>             public void configure() throws Exception {
>>>                 from("direct:save")
>>>                     .marshal(customDataFormat)
>>>                     .to("file:output");
>>>                 from("direct:load")
>>>                     .process(openFile())
>>>                     .unmarshal(customDataFormat);
>>>                 from("direct:turnaround")
>>>                     .marshal(customDataFormat)
>>>                     .unmarshal(customDataFormat);
>>>             }
>>>
>>>             private Processor openFile() {
>>>                 return new Processor() {
>>>                     @Override
>>>                     public void process(Exchange
>>> exchange) throws Exception {
>>>                         String filename =
>>> exchange.getIn().getBody(String.class);
>>>                         FileInputStream stream = new
>>> FileInputStream(filename);
>>>
>>> exchange.getIn().setBody(stream);
>>>                     }
>>>                 };
>>>             }
>>>         };
>>>     }
>>>
>>>     class CustomDataFormat implements DataFormat {
>>>
>>>         @Override
>>>         public void marshal(Exchange exchange, Object graph,
>>> OutputStream stream)
>>>                 throws Exception {
>>>             ObjectOutputStream out = null;
>>>
>>>             try {
>>>                 out = new ObjectOutputStream(stream);
>>>                 // Save the body
>>>                 out.writeObject(graph);
>>>                 // Save the message
>>>                 Message message = exchange.getIn();
>>>                 if (message != null) {
>>>                     // Message is not serializable, so
>>> save the data individually
>>>
>>> out.writeObject(message.getHeaders());
>>>
>>> out.writeObject(message.getAttachments());
>>>
>>> out.writeObject(message.getMessageId());
>>>                     out.writeObject(message.isFault());
>>>                 }
>>>             } finally {
>>>                 IOUtils.closeQuietly(out);
>>>             }
>>>         }
>>>
>>>         @SuppressWarnings("unchecked")
>>>         @Override
>>>         public Object unmarshal(Exchange exchange, InputStream
>>> stream)
>>>                 throws Exception {
>>>
>>>             ObjectInputStream in = null;
>>>             try {
>>>                 in = new ObjectInputStream(stream);
>>>
>>>                 // read the raw data
>>>                 Object body = in.readObject();
>>>                 Map<String,Object> headers = (Map<String,
>>> Object>) in.readObject();
>>>                 Map<String,DataHandler> attachments =
>>> (Map<String, DataHandler>) in.readObject();
>>>                 String messageId = (String) in.readObject();
>>>                 boolean fault = (boolean) in.readObject();
>>>
>>>                 // build the message
>>>                 Message msg = new DefaultMessage();
>>>                 msg.setBody(body);
>>>                 msg.setAttachments(attachments);
>>>                 msg.setHeaders(headers);
>>>                 msg.setMessageId(messageId);
>>>                 msg.setFault(fault);
>>>
>>>                 return msg;
>>>             } finally {
>>>                 IOUtils.closeQuietly(in);
>>>             }
>>>         }
>>>     }
>>>
>>> }
>>>
>>>> -----Ursprüngliche Nachricht-----
>>>> Von: Jan Matèrne (jhm) [mailto:apache@materne.de]
>>>> Gesendet: Donnerstag, 19. September 2013 07:06
>>>> An: users@camel.apache.org
>>>> Betreff: AW: How to set a header in custom DataFormat?
>>>>
>>>> What I have found is, that the exchange object which is passed to the
>>>> DataFormat is only used for getting the CamelContext.
>>>> I searched a little bit further and found a processor for
>>>> unmarshalling:
>>>>
>>>> org.apache.camel.processor.UnmarshalProcessor.process(Exchange,
>>>> AsyncCallback)
>>>>
>>>> And there is a note
>>>>
>>>>    Object result = dataFormat.unmarshal(exchange, stream);
>>>>      if (result instanceof Exchange) {
>>>>        if (result != exchange) {
>>>>          // it's not allowed to return another exchange other than the
>>>> one provided to dataFormat
>>>>          throw new RuntimeCamelException("The returned exchange " +
>>>> result + " is not the same as " + exchange + " provided to the
>>>> DataFormat");
>>>>
>>>>
>>>> The logic of that processor is:
>>>> - get the object from the stream
>>>> - if it is an exchange, throw that exception
>>>> - if it is a message, store as 'out' on the exchange-parameter
>>>> - if it is something else, store it as body of the 'out' message
>>>>
>>>>
>>>> So have you tried setting the header on the 'out' message?
>>>>
>>>>
>>>> Jan
>>>>
>>>>
>>>>
>>>>> -----Ursprüngliche Nachricht-----
>>>>> Von: Chris [mailto:cwolf.algo@gmail.com]
>>>>> Gesendet: Mittwoch, 18. September 2013 22:58
>>>>> An: users@camel.apache.org
>>>>> Betreff: How to set a header in custom DataFormat?
>>>>>
>>>>> I implemented a custom DataFormat and in the "unmarshal(Exchange
>>>>> exchange, InputStream stream)" implementation I set a header, but
>>>> upon
>>>>> attempting to retrieve the header downstream from the "unmarshal"
>>>>> call, the header is not there.  Since the pattern is inOnly, I added
>>>>> the header to the in Message.  I've done similar with custom
>>>>> Processors and in that case, it works.  What is different with
>>>>> DataFormat?  Is there any way to set header(s) from DataFormat
>>>> marshal/unmarshal?
>>>>>
>>>>> Thanks,
>>>>>
>>>>> Chris
>>>
>>>

Mime
View raw message