camel-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Chris <cwolf.a...@gmail.com>
Subject Re: AW: How to set a header in custom DataFormat?
Date Fri, 20 Sep 2013 14:55:41 GMT
Jan,

I really appreciate the help, unfortunately your test case does not even 
reproduce the issue I am having.  I think I may not have been clear in 
my original message.

The issue is in a custom DataFormat, if a set a *new* header, that 
header will be gone from the Exchange, for the rest of the route, 
down-stream from the unmarshal (or marshal) call.

So the issue is just setting a header and checking it it's still there. 
So the best way to recreate the problem is to create the most simplest 
DataFormat because marshal/unmarshal is not the issue - the issue is 
setting a new header *inside* the custom marshal/unmarshal methods. So I 
created a test case whose custom DataFormat does the bare minimum - no 
file I/O, no serialization - just string manipulation.

Maybe you and/or other can have a look and explain why there's the issue.

Thanks,

Chris

(the code formatting will be wrecked by the mailing list line length limit)

import java.io.BufferedReader;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.List;


import org.apache.camel.Exchange;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.impl.JndiRegistry;
import org.apache.camel.spi.DataFormat;
import org.apache.camel.test.junit4.CamelTestSupport;
import org.junit.Test;


public class SetHeaderDemo extends CamelTestSupport {

     @Test
     public void setHeaderInDataFormatProblem() throws Exception {
         MockEndpoint mock = getMockEndpoint("mock:result");
         mock.expectedMessageCount(1);
         template.sendBodyAndHeader("direct:start", "one\ntwo\nthree\n", 
"DEMO_HEADER", "Hello...");

         // This passes
         assertExpression(mock.getReceivedExchanges().get(0),
                 "simple", "${in.header.DEMO_HEADER}", "Hello...");

         // This FAILS... WHY??? *************
         assertExpression(mock.getReceivedExchanges().get(0),
             "simple", "${in.header.DISAPPEARING_DEMO_HEADER}", "Will it 
be set?");

         assertMockEndpointsSatisfied();
     }


     @Override
     protected RouteBuilder createRouteBuilder() throws Exception {

         return new RouteBuilder() {
             @Override
             public void configure() {
                 from("direct:start")
                 .unmarshal("customFmt") // <== will try to add a new 
header to Exchange IN msg
 
.to("log://dataformat.demo?showAll=true&multiline=true&level=INFO") // 
<== not there
                 .to("mock:result");
             }
         };
     }

     @Override
     protected JndiRegistry createRegistry() throws Exception {
         JndiRegistry registry = super.createRegistry();

         CustomDataFormat fmt = new CustomDataFormat();
         registry.bind("customFmt", fmt);
         return registry;
     }
}

/**
  * Simplest, contrived DataFormat impl to demonstrate that it's impossible
  * to set a new header in the DataFormat's implementation methods and
  * see the newly added header down-stream from the marshal/unmarshal
  * call(s).
  *
  */
class CustomDataFormat implements DataFormat {

     /**
      * Expects the body to be a newline-delimited list of strings,
      * which will be unmarshalled to a string array, whose elements
      * are the "lines" in the "document".
      *
      * Obviously the marshal/unmarshal process is not important - the
      * issue is that if a new header is added in the DataFormat marshal or
      * unmarshal - it will be GONE after returning.
      */
     @Override
     public Object unmarshal(Exchange exchange, InputStream stream)
             throws Exception {
         List<String> result = new ArrayList<String>();

         BufferedReader in
             = new BufferedReader(new InputStreamReader(stream));

         String line = null;
         while((line = in.readLine()) != null) {
             result.add(line);
         }

         exchange.getIn().setHeader("DISAPPEARING_DEMO_HEADER", "Will it 
be set?");
         return result;
     }

     @Override
     public void marshal(Exchange exchange, Object graph, OutputStream 
stream)
             throws Exception {
         throw new UnsupportedOperationException("Not implemented.");
     }
}




On 9/19/2013 4:13 AM, Jan Matèrne (jhm) wrote:
> I tried building my own DF and that works (for me)
>
> Jan
>
>
> package org.apache.camel.dataformat;
>
> import java.io.File;
> import java.io.FileInputStream;
> import java.io.FileNotFoundException;
> import java.io.FileOutputStream;
> import java.io.InputStream;
> import java.io.ObjectInputStream;
> import java.io.ObjectOutputStream;
> import java.io.OutputStream;
> import java.util.HashMap;
> import java.util.Map;
>
> import javax.activation.DataHandler;
>
> import org.apache.camel.Exchange;
> import org.apache.camel.Message;
> import org.apache.camel.Processor;
> import org.apache.camel.builder.ExchangeBuilder;
> import org.apache.camel.builder.RouteBuilder;
> import org.apache.camel.impl.DefaultMessage;
> import org.apache.camel.spi.DataFormat;
> import org.apache.camel.test.junit4.CamelTestSupport;
> import org.apache.camel.util.FileUtil;
> import org.apache.commons.io.IOUtils;
> import org.junit.After;
> import org.junit.Before;
> import org.junit.Test;
>
> public class SaveHeaderTest extends CamelTestSupport {
> 	
> 	private static File dataFile = new File("output/message.ser");
> 	
> 	DataFormat customDataFormat = new CustomDataFormat();
> 	
> 	@After
> 	@Before
> 	public void cleanup() {
> 		FileUtil.deleteFile(dataFile);
> 	}
> 	
> 	@SuppressWarnings("unchecked")
> 	@Test
> 	public void save() throws FileNotFoundException, Exception {
> 		File writtenTo = new File("output/message.ser");
> 		assertFalse(writtenTo.exists());
> 		Exchange exchange = ExchangeBuilder.anExchange(context)
> 				.withBody("Hello World")
> 				.withHeader("from", "Apache Camel")
> 				.withHeader("test", "save")
> 				.withHeader(Exchange.FILE_NAME,
> "message.ser")
> 				.build();
> 		template.send("direct:save", exchange);
> 		assertTrue(writtenTo.exists());
>
> 		// actual
> 		ObjectInputStream in = new ObjectInputStream(new
> FileInputStream(dataFile));
> 		Object body = in.readObject();
> 		Map<String,Object> headers = (Map<String, Object>)
> in.readObject();
> 		Map<String,DataHandler> attachments = (Map<String,
> DataHandler>) in.readObject();
> 		String messageId = (String) in.readObject();
> 		boolean fault = (boolean) in.readObject();
> 		
> 		assertEquals("Hello World", body);
> 		assertEquals("Apache Camel", headers.get("from"));
> 		assertEquals("save", headers.get("test"));
> 		assertTrue(attachments.isEmpty());
> 		assertEquals(exchange.getIn().getMessageId(), messageId);
> 		assertFalse(fault);
> 	}
> 	
> 	@Test
> 	public void load() throws Exception {
> 		// create test data
> 		ObjectOutputStream out = new ObjectOutputStream(new
> FileOutputStream(dataFile));
> 		out.writeObject("Hello World");
> 		
> 		Map<String,Object> headers = new HashMap<String, Object>();
> 		headers.put("from", "Apache Camel");
> 		headers.put("test", "load");
> 		out.writeObject(headers);
> 		
> 		Map<String,DataHandler> attachments = new
> HashMap<String,DataHandler>();
> 		out.writeObject(attachments);
> 		
> 		out.writeObject("message-id");
> 		out.writeObject(false);
> 		out.close();
> 		
> 		// load the saved 'exchange'
> 		Exchange loadCommand = ExchangeBuilder.anExchange(context)
> 				.withBody(dataFile.getAbsolutePath())
> 				.build();
> 		Exchange response = template.send("direct:load",
> loadCommand);
> 		assertEquals("Hello World",
> response.getOut().getBody(String.class));
> 		assertEquals("Apache Camel",
> response.getOut().getHeader("from"));
> 		assertEquals("load", response.getOut().getHeader("test"));
> 	}
> 	
> 	@Test
> 	public void turnaround() {
> 		Exchange exchange = ExchangeBuilder.anExchange(context)
> 				.withBody("Hello World")
> 				.withHeader("from", "Apache Camel")
> 				.build();
> 		Exchange response = template.send("direct:turnaround",
> exchange);
> 		assertEquals("Hello World",
> response.getOut().getBody(String.class));
> 		assertEquals("Apache Camel",
> response.getOut().getHeader("from"));
> 	}
> 	
> 	
> 	@Override
> 	protected RouteBuilder createRouteBuilder() throws Exception {
> 		return new RouteBuilder() {
> 			@Override
> 			public void configure() throws Exception {
> 				from("direct:save")
> 					.marshal(customDataFormat)
> 					.to("file:output");
> 				from("direct:load")
> 					.process(openFile())
> 					.unmarshal(customDataFormat);
> 				from("direct:turnaround")
> 					.marshal(customDataFormat)
> 					.unmarshal(customDataFormat);
> 			}
>
> 			private Processor openFile() {
> 				return new Processor() {
> 					@Override
> 					public void process(Exchange
> exchange) throws Exception {
> 						String filename =
> exchange.getIn().getBody(String.class);
> 						FileInputStream stream = new
> FileInputStream(filename);
> 	
> exchange.getIn().setBody(stream);
> 					}
> 				};
> 			}
> 		};
> 	}
> 	
> 	class CustomDataFormat implements DataFormat {
> 		
> 		@Override
> 		public void marshal(Exchange exchange, Object graph,
> OutputStream stream)
> 				throws Exception {
> 			ObjectOutputStream out = null;
> 			
> 			try {
> 				out = new ObjectOutputStream(stream);
> 				// Save the body
> 				out.writeObject(graph);
> 				// Save the message
> 				Message message = exchange.getIn();
> 				if (message != null) {
> 					// Message is not serializable, so
> save the data individually
> 	
> out.writeObject(message.getHeaders());
> 	
> out.writeObject(message.getAttachments());
> 	
> out.writeObject(message.getMessageId());
> 					out.writeObject(message.isFault());
> 				}
> 			} finally {
> 				IOUtils.closeQuietly(out);
> 			}
> 		}
>
> 		@SuppressWarnings("unchecked")
> 		@Override
> 		public Object unmarshal(Exchange exchange, InputStream
> stream)
> 				throws Exception {
> 			
> 			ObjectInputStream in = null;
> 			try {
> 				in = new ObjectInputStream(stream);
> 				
> 				// read the raw data
> 				Object body = in.readObject();
> 				Map<String,Object> headers = (Map<String,
> Object>) in.readObject();
> 				Map<String,DataHandler> attachments =
> (Map<String, DataHandler>) in.readObject();
> 				String messageId = (String) in.readObject();
> 				boolean fault = (boolean) in.readObject();
> 				
> 				// build the message
> 				Message msg = new DefaultMessage();
> 				msg.setBody(body);
> 				msg.setAttachments(attachments);
> 				msg.setHeaders(headers);
> 				msg.setMessageId(messageId);
> 				msg.setFault(fault);
> 				
> 				return msg;
> 			} finally {
> 				IOUtils.closeQuietly(in);
> 			}
> 		}
> 	}
> 	
> }
>
>> -----Ursprüngliche Nachricht-----
>> Von: Jan Matèrne (jhm) [mailto:apache@materne.de]
>> Gesendet: Donnerstag, 19. September 2013 07:06
>> An: users@camel.apache.org
>> Betreff: AW: How to set a header in custom DataFormat?
>>
>> What I have found is, that the exchange object which is passed to the
>> DataFormat is only used for getting the CamelContext.
>> I searched a little bit further and found a processor for
>> unmarshalling:
>>
>> org.apache.camel.processor.UnmarshalProcessor.process(Exchange,
>> AsyncCallback)
>>
>> And there is a note
>>
>>    Object result = dataFormat.unmarshal(exchange, stream);
>>      if (result instanceof Exchange) {
>>        if (result != exchange) {
>>          // it's not allowed to return another exchange other than the
>> one provided to dataFormat
>>          throw new RuntimeCamelException("The returned exchange " +
>> result + " is not the same as " + exchange + " provided to the
>> DataFormat");
>>
>>
>> The logic of that processor is:
>> - get the object from the stream
>> - if it is an exchange, throw that exception
>> - if it is a message, store as 'out' on the exchange-parameter
>> - if it is something else, store it as body of the 'out' message
>>
>>
>> So have you tried setting the header on the 'out' message?
>>
>>
>> Jan
>>
>>
>>
>>> -----Ursprüngliche Nachricht-----
>>> Von: Chris [mailto:cwolf.algo@gmail.com]
>>> Gesendet: Mittwoch, 18. September 2013 22:58
>>> An: users@camel.apache.org
>>> Betreff: How to set a header in custom DataFormat?
>>>
>>> I implemented a custom DataFormat and in the "unmarshal(Exchange
>>> exchange, InputStream stream)" implementation I set a header, but
>> upon
>>> attempting to retrieve the header downstream from the "unmarshal"
>>> call, the header is not there.  Since the pattern is inOnly, I added
>>> the header to the in Message.  I've done similar with custom
>>> Processors and in that case, it works.  What is different with
>>> DataFormat?  Is there any way to set header(s) from DataFormat
>> marshal/unmarshal?
>>>
>>> Thanks,
>>>
>>> Chris
>
>

Mime
View raw message