camel-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Chris <cwolf.a...@gmail.com>
Subject Re: AW: How to set a header in custom DataFormat?
Date Fri, 20 Sep 2013 15:57:45 GMT
Ok, I solved the issue.  After reading the source code of
org.apache.camel.processor.UnmarshalProcessor, I saw this little piece 
of code:

             } else if (result instanceof Message) {
                 // the dataformat has probably set headers, 
attachments, etc. so let's use it as the outbound payload
                 exchange.setOut((Message) result);

Which means if a DataFormat implementation wants to set headers and/or 
attachments, the unmarshal implementation cannot directly return the 
results - it must create a new Message and set the results to body of 
the new Message and return this new Message instead.

I have to say, I never saw this documented anywhere nor any example in 
the book.  Of course I understand that there are a lot of higher 
priority items to do, so I'm not complaining ;)

Well here's the revised sample DataFormat where you *can* successfully 
set new headers in the unmarshal implementation:

Regards,

Chris


class CustomDataFormat2 implements DataFormat {

     @Override
     public Object unmarshal(Exchange exchange, InputStream stream)
             throws Exception {

         // To be able to set new headers and/or attachments, etc.
         // we need to wrap the result in a new Message and return
         // the new Message instead... who knew?
         Message msg = exchange.getIn().copy();

         List<String> result = new ArrayList<String>();

         BufferedReader in
             = new BufferedReader(new InputStreamReader(stream));

         String line = null;
         while((line = in.readLine()) != null) {
             result.add(line);
         }

         msg.setBody(result);

         // now it works!
         msg.setHeader("DISAPPEARING_DEMO_HEADER", "Will it be set?");
         return msg;
     }

     @Override
     public void marshal(Exchange exchange, Object graph, OutputStream 
stream)
             throws Exception {
         throw new UnsupportedOperationException("Not implemented.");
     }
}


On 9/20/2013 10:55 AM, Chris wrote:
> Jan,
>
> I really appreciate the help, unfortunately your test case does not even
> reproduce the issue I am having.  I think I may not have been clear in
> my original message.
>
> The issue is in a custom DataFormat, if a set a *new* header, that
> header will be gone from the Exchange, for the rest of the route,
> down-stream from the unmarshal (or marshal) call.
>
> So the issue is just setting a header and checking it it's still there.
> So the best way to recreate the problem is to create the most simplest
> DataFormat because marshal/unmarshal is not the issue - the issue is
> setting a new header *inside* the custom marshal/unmarshal methods. So I
> created a test case whose custom DataFormat does the bare minimum - no
> file I/O, no serialization - just string manipulation.
>
> Maybe you and/or other can have a look and explain why there's the issue.
>
> Thanks,
>
> Chris
>
> (the code formatting will be wrecked by the mailing list line length limit)
>
> import java.io.BufferedReader;
> import java.io.InputStream;
> import java.io.InputStreamReader;
> import java.io.OutputStream;
> import java.util.ArrayList;
> import java.util.List;
>
>
> import org.apache.camel.Exchange;
> import org.apache.camel.builder.RouteBuilder;
> import org.apache.camel.component.mock.MockEndpoint;
> import org.apache.camel.impl.JndiRegistry;
> import org.apache.camel.spi.DataFormat;
> import org.apache.camel.test.junit4.CamelTestSupport;
> import org.junit.Test;
>
>
> public class SetHeaderDemo extends CamelTestSupport {
>
>      @Test
>      public void setHeaderInDataFormatProblem() throws Exception {
>          MockEndpoint mock = getMockEndpoint("mock:result");
>          mock.expectedMessageCount(1);
>          template.sendBodyAndHeader("direct:start", "one\ntwo\nthree\n",
> "DEMO_HEADER", "Hello...");
>
>          // This passes
>          assertExpression(mock.getReceivedExchanges().get(0),
>                  "simple", "${in.header.DEMO_HEADER}", "Hello...");
>
>          // This FAILS... WHY??? *************
>          assertExpression(mock.getReceivedExchanges().get(0),
>              "simple", "${in.header.DISAPPEARING_DEMO_HEADER}", "Will it
> be set?");
>
>          assertMockEndpointsSatisfied();
>      }
>
>
>      @Override
>      protected RouteBuilder createRouteBuilder() throws Exception {
>
>          return new RouteBuilder() {
>              @Override
>              public void configure() {
>                  from("direct:start")
>                  .unmarshal("customFmt") // <== will try to add a new
> header to Exchange IN msg
>
> .to("log://dataformat.demo?showAll=true&multiline=true&level=INFO") //
> <== not there
>                  .to("mock:result");
>              }
>          };
>      }
>
>      @Override
>      protected JndiRegistry createRegistry() throws Exception {
>          JndiRegistry registry = super.createRegistry();
>
>          CustomDataFormat fmt = new CustomDataFormat();
>          registry.bind("customFmt", fmt);
>          return registry;
>      }
> }
>
> /**
>   * Simplest, contrived DataFormat impl to demonstrate that it's impossible
>   * to set a new header in the DataFormat's implementation methods and
>   * see the newly added header down-stream from the marshal/unmarshal
>   * call(s).
>   *
>   */
> class CustomDataFormat implements DataFormat {
>
>      /**
>       * Expects the body to be a newline-delimited list of strings,
>       * which will be unmarshalled to a string array, whose elements
>       * are the "lines" in the "document".
>       *
>       * Obviously the marshal/unmarshal process is not important - the
>       * issue is that if a new header is added in the DataFormat marshal or
>       * unmarshal - it will be GONE after returning.
>       */
>      @Override
>      public Object unmarshal(Exchange exchange, InputStream stream)
>              throws Exception {
>          List<String> result = new ArrayList<String>();
>
>          BufferedReader in
>              = new BufferedReader(new InputStreamReader(stream));
>
>          String line = null;
>          while((line = in.readLine()) != null) {
>              result.add(line);
>          }
>
>          exchange.getIn().setHeader("DISAPPEARING_DEMO_HEADER", "Will it
> be set?");
>          return result;
>      }
>
>      @Override
>      public void marshal(Exchange exchange, Object graph, OutputStream
> stream)
>              throws Exception {
>          throw new UnsupportedOperationException("Not implemented.");
>      }
> }
>
>
>
>
> On 9/19/2013 4:13 AM, Jan Matèrne (jhm) wrote:
>> I tried building my own DF and that works (for me)
>>
>> Jan
>>
>>
>> package org.apache.camel.dataformat;
>>
>> import java.io.File;
>> import java.io.FileInputStream;
>> import java.io.FileNotFoundException;
>> import java.io.FileOutputStream;
>> import java.io.InputStream;
>> import java.io.ObjectInputStream;
>> import java.io.ObjectOutputStream;
>> import java.io.OutputStream;
>> import java.util.HashMap;
>> import java.util.Map;
>>
>> import javax.activation.DataHandler;
>>
>> import org.apache.camel.Exchange;
>> import org.apache.camel.Message;
>> import org.apache.camel.Processor;
>> import org.apache.camel.builder.ExchangeBuilder;
>> import org.apache.camel.builder.RouteBuilder;
>> import org.apache.camel.impl.DefaultMessage;
>> import org.apache.camel.spi.DataFormat;
>> import org.apache.camel.test.junit4.CamelTestSupport;
>> import org.apache.camel.util.FileUtil;
>> import org.apache.commons.io.IOUtils;
>> import org.junit.After;
>> import org.junit.Before;
>> import org.junit.Test;
>>
>> public class SaveHeaderTest extends CamelTestSupport {
>>
>>     private static File dataFile = new File("output/message.ser");
>>
>>     DataFormat customDataFormat = new CustomDataFormat();
>>
>>     @After
>>     @Before
>>     public void cleanup() {
>>         FileUtil.deleteFile(dataFile);
>>     }
>>
>>     @SuppressWarnings("unchecked")
>>     @Test
>>     public void save() throws FileNotFoundException, Exception {
>>         File writtenTo = new File("output/message.ser");
>>         assertFalse(writtenTo.exists());
>>         Exchange exchange = ExchangeBuilder.anExchange(context)
>>                 .withBody("Hello World")
>>                 .withHeader("from", "Apache Camel")
>>                 .withHeader("test", "save")
>>                 .withHeader(Exchange.FILE_NAME,
>> "message.ser")
>>                 .build();
>>         template.send("direct:save", exchange);
>>         assertTrue(writtenTo.exists());
>>
>>         // actual
>>         ObjectInputStream in = new ObjectInputStream(new
>> FileInputStream(dataFile));
>>         Object body = in.readObject();
>>         Map<String,Object> headers = (Map<String, Object>)
>> in.readObject();
>>         Map<String,DataHandler> attachments = (Map<String,
>> DataHandler>) in.readObject();
>>         String messageId = (String) in.readObject();
>>         boolean fault = (boolean) in.readObject();
>>
>>         assertEquals("Hello World", body);
>>         assertEquals("Apache Camel", headers.get("from"));
>>         assertEquals("save", headers.get("test"));
>>         assertTrue(attachments.isEmpty());
>>         assertEquals(exchange.getIn().getMessageId(), messageId);
>>         assertFalse(fault);
>>     }
>>
>>     @Test
>>     public void load() throws Exception {
>>         // create test data
>>         ObjectOutputStream out = new ObjectOutputStream(new
>> FileOutputStream(dataFile));
>>         out.writeObject("Hello World");
>>
>>         Map<String,Object> headers = new HashMap<String, Object>();
>>         headers.put("from", "Apache Camel");
>>         headers.put("test", "load");
>>         out.writeObject(headers);
>>
>>         Map<String,DataHandler> attachments = new
>> HashMap<String,DataHandler>();
>>         out.writeObject(attachments);
>>
>>         out.writeObject("message-id");
>>         out.writeObject(false);
>>         out.close();
>>
>>         // load the saved 'exchange'
>>         Exchange loadCommand = ExchangeBuilder.anExchange(context)
>>                 .withBody(dataFile.getAbsolutePath())
>>                 .build();
>>         Exchange response = template.send("direct:load",
>> loadCommand);
>>         assertEquals("Hello World",
>> response.getOut().getBody(String.class));
>>         assertEquals("Apache Camel",
>> response.getOut().getHeader("from"));
>>         assertEquals("load", response.getOut().getHeader("test"));
>>     }
>>
>>     @Test
>>     public void turnaround() {
>>         Exchange exchange = ExchangeBuilder.anExchange(context)
>>                 .withBody("Hello World")
>>                 .withHeader("from", "Apache Camel")
>>                 .build();
>>         Exchange response = template.send("direct:turnaround",
>> exchange);
>>         assertEquals("Hello World",
>> response.getOut().getBody(String.class));
>>         assertEquals("Apache Camel",
>> response.getOut().getHeader("from"));
>>     }
>>
>>
>>     @Override
>>     protected RouteBuilder createRouteBuilder() throws Exception {
>>         return new RouteBuilder() {
>>             @Override
>>             public void configure() throws Exception {
>>                 from("direct:save")
>>                     .marshal(customDataFormat)
>>                     .to("file:output");
>>                 from("direct:load")
>>                     .process(openFile())
>>                     .unmarshal(customDataFormat);
>>                 from("direct:turnaround")
>>                     .marshal(customDataFormat)
>>                     .unmarshal(customDataFormat);
>>             }
>>
>>             private Processor openFile() {
>>                 return new Processor() {
>>                     @Override
>>                     public void process(Exchange
>> exchange) throws Exception {
>>                         String filename =
>> exchange.getIn().getBody(String.class);
>>                         FileInputStream stream = new
>> FileInputStream(filename);
>>
>> exchange.getIn().setBody(stream);
>>                     }
>>                 };
>>             }
>>         };
>>     }
>>
>>     class CustomDataFormat implements DataFormat {
>>
>>         @Override
>>         public void marshal(Exchange exchange, Object graph,
>> OutputStream stream)
>>                 throws Exception {
>>             ObjectOutputStream out = null;
>>
>>             try {
>>                 out = new ObjectOutputStream(stream);
>>                 // Save the body
>>                 out.writeObject(graph);
>>                 // Save the message
>>                 Message message = exchange.getIn();
>>                 if (message != null) {
>>                     // Message is not serializable, so
>> save the data individually
>>
>> out.writeObject(message.getHeaders());
>>
>> out.writeObject(message.getAttachments());
>>
>> out.writeObject(message.getMessageId());
>>                     out.writeObject(message.isFault());
>>                 }
>>             } finally {
>>                 IOUtils.closeQuietly(out);
>>             }
>>         }
>>
>>         @SuppressWarnings("unchecked")
>>         @Override
>>         public Object unmarshal(Exchange exchange, InputStream
>> stream)
>>                 throws Exception {
>>
>>             ObjectInputStream in = null;
>>             try {
>>                 in = new ObjectInputStream(stream);
>>
>>                 // read the raw data
>>                 Object body = in.readObject();
>>                 Map<String,Object> headers = (Map<String,
>> Object>) in.readObject();
>>                 Map<String,DataHandler> attachments =
>> (Map<String, DataHandler>) in.readObject();
>>                 String messageId = (String) in.readObject();
>>                 boolean fault = (boolean) in.readObject();
>>
>>                 // build the message
>>                 Message msg = new DefaultMessage();
>>                 msg.setBody(body);
>>                 msg.setAttachments(attachments);
>>                 msg.setHeaders(headers);
>>                 msg.setMessageId(messageId);
>>                 msg.setFault(fault);
>>
>>                 return msg;
>>             } finally {
>>                 IOUtils.closeQuietly(in);
>>             }
>>         }
>>     }
>>
>> }
>>
>>> -----Ursprüngliche Nachricht-----
>>> Von: Jan Matèrne (jhm) [mailto:apache@materne.de]
>>> Gesendet: Donnerstag, 19. September 2013 07:06
>>> An: users@camel.apache.org
>>> Betreff: AW: How to set a header in custom DataFormat?
>>>
>>> What I have found is, that the exchange object which is passed to the
>>> DataFormat is only used for getting the CamelContext.
>>> I searched a little bit further and found a processor for
>>> unmarshalling:
>>>
>>> org.apache.camel.processor.UnmarshalProcessor.process(Exchange,
>>> AsyncCallback)
>>>
>>> And there is a note
>>>
>>>    Object result = dataFormat.unmarshal(exchange, stream);
>>>      if (result instanceof Exchange) {
>>>        if (result != exchange) {
>>>          // it's not allowed to return another exchange other than the
>>> one provided to dataFormat
>>>          throw new RuntimeCamelException("The returned exchange " +
>>> result + " is not the same as " + exchange + " provided to the
>>> DataFormat");
>>>
>>>
>>> The logic of that processor is:
>>> - get the object from the stream
>>> - if it is an exchange, throw that exception
>>> - if it is a message, store as 'out' on the exchange-parameter
>>> - if it is something else, store it as body of the 'out' message
>>>
>>>
>>> So have you tried setting the header on the 'out' message?
>>>
>>>
>>> Jan
>>>
>>>
>>>
>>>> -----Ursprüngliche Nachricht-----
>>>> Von: Chris [mailto:cwolf.algo@gmail.com]
>>>> Gesendet: Mittwoch, 18. September 2013 22:58
>>>> An: users@camel.apache.org
>>>> Betreff: How to set a header in custom DataFormat?
>>>>
>>>> I implemented a custom DataFormat and in the "unmarshal(Exchange
>>>> exchange, InputStream stream)" implementation I set a header, but
>>> upon
>>>> attempting to retrieve the header downstream from the "unmarshal"
>>>> call, the header is not there.  Since the pattern is inOnly, I added
>>>> the header to the in Message.  I've done similar with custom
>>>> Processors and in that case, it works.  What is different with
>>>> DataFormat?  Is there any way to set header(s) from DataFormat
>>> marshal/unmarshal?
>>>>
>>>> Thanks,
>>>>
>>>> Chris
>>
>>

Mime
View raw message