storm-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From saurabh mimani <mimani.saur...@gmail.com>
Subject Re: Storm using Kafka Spout gives error: IllegalStateException
Date Thu, 13 Dec 2018 06:51:05 GMT
Hey Stig,

Thanks for looking into this and providing a fix. We have build the
storm-kafka-client jar from your branch and it is working fine on that. We
are still testing it and will let you know if there are any issues we fix.

I see for STORM-3301 <https://issues.apache.org/jira/browse/STORM-3301>,
you have put the fix version as 1.2.3, any approximate date when will this
be released?

Thanks for your help :)





Best Regards

Saurabh Kumar Mimani




On Mon, Dec 10, 2018 at 1:12 AM Stig Rohde Døssing <stigdoessing@gmail.com>
wrote:

> I'm assuming you applied the fix on top of 1.2.1 or something like that?
> The exception can't be thrown from the branch I linked, since it was
> removed in an earlier commit in 1.x-branch.
>
> Your logs show that the committed offset for partition 6 is 1682098 (98
> for short). 98 was emitted, since it shows up in the emitted list. I'm
> guessing it failed and was replayed. 99 and up are in the acked list, so
> they are ready to commit as soon as 98 finishes processing.
>
> The log shows that 99 is the tuple encountering the exception, so I'm
> guessing what happened is that 98 was acked and the spout decided to commit
> 98, 99 etc. For some reason it then still decides to emit 99. The only
> reasons I can think of (barring bugs in Kafka/the Kafka client) for that to
> happen would be that 99 is in waitingToEmit and isn't being cleared out
> during the commit (this is the bug I tried to fix), somehow 99 is still
> queued for retry (this should not be possible) or for some reason the
> consumer position ends up below the committed offset. I think the best bet
> for tracking down why it happens would be logging the contents of the
> RetryService, the contents of waitingToEmit and the consumer position both
> after commitOffsetsForAckedTuples, and right before the exception is
> thrown.
>
> Could you try logging those? I can add the log statements on top of the
> bugfix if needed.
>
> Den søn. 9. dec. 2018 kl. 18.42 skrev saurabh mimani <
> mimani.saurabh@gmail.com>:
>
>> Hey, I see this is still happening, this time, it seems, as it seemed to
>> me, because same offset from different partition was committed(guessing
>> from logs), but not sure as that should be handled.
>>
>> Please find the logs here
>> <https://gist.github.com/mimani/ff27b7272482efc91c4d145d59ab59be>.
>>
>>
>>
>> Best Regards
>>
>> Saurabh Kumar Mimani
>>
>>
>>
>>
>> On Sun, Dec 9, 2018 at 3:19 AM Stig Rohde Døssing <stigdoessing@gmail.com>
>> wrote:
>>
>>> I believe I have a fix, your logs were helpful. Please try out the
>>> changes in https://github.com/apache/storm/pull/2923/files.
>>>
>>> Den lør. 8. dec. 2018 kl. 07.25 skrev saurabh mimani <
>>> mimani.saurabh@gmail.com>:
>>>
>>>> Hey,
>>>>
>>>> Thanks for looking into this. I was not able to produce this earlier on
>>>> my local, however I will again try once. I was consistently able to
>>>> reproduce it with parallelism of 5 for boltA and parallelism of 200 with
>>>> boltB on 2 machines in cluster mode.
>>>>
>>>> I will try again with your code once.
>>>>
>>>> These <https://gist.github.com/mimani/56dd31db34e4356b25c796d78261f7b8>
are
>>>> logs of Kafka Spout, when I was able to reproduce it in cluster mode with
>>>> my topology, in case these helps.
>>>>
>>>>
>>>>
>>>> Best Regards
>>>>
>>>> Saurabh Kumar Mimani
>>>>
>>>>
>>>>
>>>>
>>>> On Wed, Dec 5, 2018 at 11:33 PM Stig Rohde Døssing <
>>>> stigdoessing@gmail.com> wrote:
>>>>
>>>>> I can't reproduce this.
>>>>>
>>>>> I created a test topology similar to the code you posted, based on the
>>>>> 1.2.1 release tag
>>>>> https://github.com/srdo/storm/commit/f5577f7a773f3d433b90a2670de5329b396f5564
>>>>> .
>>>>>
>>>>> I set up a local Kafka instance and put enough messages to run the
>>>>> topology for 15 minutes or so in the test topic. After populating the
>>>>> topic, I started the topology and let it run until it reached the end
of
>>>>> the topic. As expected a lot of messages failed, but after a while it
>>>>> managed to successfully process all messages. I didn't see any worker
>>>>> crashes, and the logs only show some errors related to moving files
>>>>> (expected on Windows).
>>>>>
>>>>> The topology seems to work fine against both Kafka 0.10.2.2 and 1.1.1,
>>>>> though 1.1.1 is slower due to
>>>>> https://issues.apache.org/jira/browse/STORM-3102.
>>>>>
>>>>> The Kafka setup was with the default configuration for both Kafka and
>>>>> Zookeeper, and Storm was set up with a local Nimbus, single local
>>>>> Supervisor and 4 worker slots.
>>>>>
>>>>> Saurabh please try to reproduce the issue using the topology I linked.
>>>>> If you need to make adjustments in order to provoke the issue, please
>>>>> update the code and link it so I can check it out and try it.
>>>>>
>>>>> Den ons. 5. dec. 2018 kl. 16.42 skrev saurabh mimani <
>>>>> mimani.saurabh@gmail.com>:
>>>>>
>>>>>> No, I have checked that, there is no other consumer group consuming
>>>>>> from the same.
>>>>>>
>>>>>> Thanks for looking into it, let me know if you need any
>>>>>> other information.
>>>>>>
>>>>>>
>>>>>>
>>>>>> Best Regards
>>>>>>
>>>>>> Saurabh Kumar Mimani
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Wed, Dec 5, 2018 at 9:02 PM Stig Rohde Døssing <
>>>>>> stigdoessing@gmail.com> wrote:
>>>>>>
>>>>>>> Ravi, BaseBasicBolt does automatically anchor any emitted tuples
to
>>>>>>> the input tuple. It's intended for bolts that just need to receive
a tuple,
>>>>>>> synchronously process it and emit some new tuples anchored to
the input
>>>>>>> tuple. It's there because doing manual acking is tedious and
error-prone in
>>>>>>> cases where you don't need to be able to e.g. emit new unachored
tuples or
>>>>>>> ack the input tuple asynchronously. As Peter mentioned, the
>>>>>>> BasicBoltExecutor (
>>>>>>> https://github.com/apache/storm/blob/21bb1388414d373572779289edc785c7e5aa52aa/storm-client/src/jvm/org/apache/storm/topology/BasicBoltExecutor.java#L42)
>>>>>>> handles acking for you.
>>>>>>>
>>>>>>> Saurabh, I'll see if I can reproduce your issue. Please also
check
>>>>>>> that you don't have any other consumers using the same consumer
group as
>>>>>>> the spout.
>>>>>>>
>>>>>>> Den ons. 5. dec. 2018 kl. 11.53 skrev Peter Chamberlain <
>>>>>>> peter.chamberlain@htk.co.uk>:
>>>>>>>
>>>>>>>> Pretty sure that the ack path is handled by BasicBoltExecutor
(an
>>>>>>>> implmentation of IRichBolt), which calls collector.setContext(input),
and
>>>>>>>> also does the acking inside it's execute function, and in
between calls the
>>>>>>>> BaseBasicBolt.execute version (which takes the collector
as well as the
>>>>>>>> tuple as parameters).
>>>>>>>> So the intention is clearly that it is automatically anchored
and
>>>>>>>> acknowledged.
>>>>>>>>
>>>>>>>> On Wed, 5 Dec 2018 at 09:57, Ravi Sharma <ping2ravi@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi Saurabh,
>>>>>>>>> I checked the BaseBasicBolt which comes with storm, it
doesn't do
>>>>>>>>> much.
>>>>>>>>> Also checked BasicOutputCollector and don't see how it
will
>>>>>>>>> anchor automatically unless you call
>>>>>>>>>  BasicOutputCollector.setContext(Tuple tuple), don't
see all of your code,
>>>>>>>>> but don't see this call in your boltA code.
>>>>>>>>> Also it looks like even when you make this setContext
call, after
>>>>>>>>> that you will have to emit using following emit function
>>>>>>>>>
>>>>>>>>> BasicOutputCollector.emit(String streamId, List<Object>
tuples)
>>>>>>>>>
>>>>>>>>> Basically just check that whatever emit function is called,
it
>>>>>>>>> does pass the input tuple.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Once I had exactly same issue for few days, but mine
was related
>>>>>>>>> to config. I wanted to read from two Kafka topics in
one topology and had
>>>>>>>>> two different kafkaspout created, mistakenly I copy pasted
same config and
>>>>>>>>> that caused this issue. Not sure if that applies to your
scenario.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> *NOTE*: I checked the latest storm master branch for
code.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Wed, 5 Dec 2018, 08:11 saurabh mimani <mimani.saurabh@gmail.com
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Hey Ravi,
>>>>>>>>>>
>>>>>>>>>> I am using *BaseBasicBolt*, which as described here
>>>>>>>>>> <http://storm.apache.org/releases/1.0.6/Guaranteeing-message-processing.html>
>>>>>>>>>> : Tuples emitted to BasicOutputCollector are automatically
>>>>>>>>>> anchored to the input tuple, and the input tuple
is acked for you
>>>>>>>>>> automatically when the execute method completes.
>>>>>>>>>>
>>>>>>>>>> What you are saying is applicable for *BaseRichBolt.
*The Kafka
>>>>>>>>>> spout I am using is from storm-kafka-client
>>>>>>>>>> <https://mvnrepository.com/artifact/org.apache.storm/storm-kafka-client/1.2.1>
>>>>>>>>>> library, so unique ID, etc should be already taken
care of.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Best Regards
>>>>>>>>>>
>>>>>>>>>> Saurabh Kumar Mimani
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Wed, Dec 5, 2018 at 12:52 PM Ravi Sharma <ping2ravi@gmail.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi Saurabh,
>>>>>>>>>>> I think there is issue with part of code which
is emitting the
>>>>>>>>>>> tuples.
>>>>>>>>>>>
>>>>>>>>>>> If you want to use ack mechanism, you need to
use anchor tuple
>>>>>>>>>>> when you emit from bolts.
>>>>>>>>>>>
>>>>>>>>>>> Collector.emit(Tuple input, Values data)
>>>>>>>>>>>
>>>>>>>>>>> Also make sure Kafka spout emits tuple with a
unique id.
>>>>>>>>>>>
>>>>>>>>>>> Thanks
>>>>>>>>>>> Ravi
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Wed, 5 Dec 2018, 06:35 saurabh mimani <
>>>>>>>>>>> mimani.saurabh@gmail.com wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hey,
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks for your reply. What you are saying
is correct, However
>>>>>>>>>>>> I am able to reproduce it more often and
I think it happens when multiple
>>>>>>>>>>>> tuples gets failed in first run but all of
those gets success on retry,
>>>>>>>>>>>> something of that sort is happening.
>>>>>>>>>>>>
>>>>>>>>>>>> This can be reproduced using following two
bolts and kafkaSpout
>>>>>>>>>>>> easily, by running in cluster more with 3/4
minutes:
>>>>>>>>>>>>
>>>>>>>>>>>> *BoltA*
>>>>>>>>>>>>
>>>>>>>>>>>> case class Abc(index: Int, rand: Boolean)
>>>>>>>>>>>>
>>>>>>>>>>>> class BoltA  extends BaseBasicBolt {
>>>>>>>>>>>>
>>>>>>>>>>>>   override def execute(input: Tuple, collector:
BasicOutputCollector): Unit = {
>>>>>>>>>>>>     val inp = input.getBinaryByField("value").getObj[someObj]
>>>>>>>>>>>>     val randomGenerator = new Random()
>>>>>>>>>>>>
>>>>>>>>>>>>     var i = 0
>>>>>>>>>>>>     val rand = randomGenerator.nextBoolean()
>>>>>>>>>>>>     1 to 100 foreach {
>>>>>>>>>>>>       collector.emit(new Values(Abc(i, rand).getJsonBytes))
>>>>>>>>>>>>       i += 1
>>>>>>>>>>>>     }
>>>>>>>>>>>>   }
>>>>>>>>>>>>
>>>>>>>>>>>>   override def declareOutputFields(declarer:
OutputFieldsDeclarer): Unit = {
>>>>>>>>>>>>     declarer.declare(new Fields("boltAout"))
>>>>>>>>>>>>   }
>>>>>>>>>>>>
>>>>>>>>>>>> }
>>>>>>>>>>>>
>>>>>>>>>>>> *BoltB*
>>>>>>>>>>>>
>>>>>>>>>>>> class BoltB  extends BaseBasicBolt {
>>>>>>>>>>>>
>>>>>>>>>>>>   override def execute(input: Tuple, collector:
BasicOutputCollector): Unit = {
>>>>>>>>>>>>     val abc = input.getBinaryByField("boltAout").getObj[Abc]
>>>>>>>>>>>>     println(s"Received ${abc.index}th tuple
in BoltB")
>>>>>>>>>>>>     if(abc.index >= 97 && abc.rand){
>>>>>>>>>>>>       println(s"throwing FailedException
for ${abc.index}th tuple for")
>>>>>>>>>>>>       throw new FailedException()
>>>>>>>>>>>>     }
>>>>>>>>>>>>   }
>>>>>>>>>>>>
>>>>>>>>>>>>   override def declareOutputFields(declarer:
OutputFieldsDeclarer): Unit = {
>>>>>>>>>>>>   }
>>>>>>>>>>>> }
>>>>>>>>>>>>
>>>>>>>>>>>> *KafkaSpout:*
>>>>>>>>>>>>
>>>>>>>>>>>> private def getKafkaSpoutConfig(source: Config)
= KafkaSpoutConfig.builder("connections.kafka.producerConnProps.metadata.broker.list", "queueName")
>>>>>>>>>>>>     .setProp(ConsumerConfig.GROUP_ID_CONFIG,
"grp")
>>>>>>>>>>>>     .setProp(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.ByteArrayDeserializer")
>>>>>>>>>>>>     .setOffsetCommitPeriodMs(100)
>>>>>>>>>>>>     .setRetry(new KafkaSpoutRetryExponentialBackoff(
>>>>>>>>>>>>       KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(100),
>>>>>>>>>>>>       KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(100),
>>>>>>>>>>>>       10,
>>>>>>>>>>>>       KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(3000)
>>>>>>>>>>>>     ))
>>>>>>>>>>>>     .setFirstPollOffsetStrategy(offsetStrategyMapping(ConnektConfig.getOrElse("connections.kafka.consumerConnProps.offset.strategy",
"UNCOMMITTED_EARLIEST")))
>>>>>>>>>>>>     .setMaxUncommittedOffsets(ConnektConfig.getOrElse("connections.kafka.consumerConnProps.max.uncommited.offset",
10000))
>>>>>>>>>>>>     .build()
>>>>>>>>>>>>
>>>>>>>>>>>> Other config:
>>>>>>>>>>>>
>>>>>>>>>>>> messageTimeoutInSecons: 300
>>>>>>>>>>>>
>>>>>>>>>>>> [image: Screenshot 2018-12-05 at 12.03.08
PM.png]
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> Best Regards
>>>>>>>>>>>>
>>>>>>>>>>>> Saurabh Kumar Mimani
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Mon, Dec 3, 2018 at 9:18 PM Stig Rohde
Døssing <
>>>>>>>>>>>> stigdoessing@gmail.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Hi Saurabh,
>>>>>>>>>>>>>
>>>>>>>>>>>>> The tuple emitted by the spout will only
be acked once all
>>>>>>>>>>>>> branches of the tuple tree have been
acked, i.e. all 100 tuples are acked.
>>>>>>>>>>>>>
>>>>>>>>>>>>> The error you're seeing was added as
part of
>>>>>>>>>>>>> https://issues.apache.org/jira/browse/STORM-2666
to try to
>>>>>>>>>>>>> avoid having that bug pop up again. Could
you try posting your spout
>>>>>>>>>>>>> configuration? Also if possible, it would
be helpful if you could enable
>>>>>>>>>>>>> debug logging for org.apache.storm.kafka.spout.KafkaSpout
and
>>>>>>>>>>>>> maybe also org.apache.storm.kafka.spout.internal.OffsetManager.
>>>>>>>>>>>>> They log when offsets are committed (e.g.
>>>>>>>>>>>>> https://github.com/apache/storm/blob/v1.2.1/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java#L546),
>>>>>>>>>>>>> and also when the consumer position is
changed (e.g.
>>>>>>>>>>>>> https://github.com/apache/storm/blob/v1.2.1/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java#L561
>>>>>>>>>>>>> ). It should be possible to track down
when/why the consumer position wound
>>>>>>>>>>>>> up behind the committed offset.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Just so you're aware, the check that
crashes the spout has
>>>>>>>>>>>>> been removed as of
>>>>>>>>>>>>> https://issues.apache.org/jira/browse/STORM-3102.
I'd still
>>>>>>>>>>>>> like to know if there's a bug in the
spout causing it to emit tuples that
>>>>>>>>>>>>> were already committed though.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Den man. 3. dec. 2018 kl. 11.29 skrev
saurabh mimani <
>>>>>>>>>>>>> mimani.saurabh@gmail.com>:
>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Version Info:
>>>>>>>>>>>>>>    "org.apache.storm" % "storm-core"
% "1.2.1"
>>>>>>>>>>>>>>    "org.apache.storm" % "storm-kafka-client"
% "1.2.1"
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I have a storm topology which looks
like following:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> boltA -> boltB -> boltC ->
boltD
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> boltA just does some formatting of
requests and emits
>>>>>>>>>>>>>> another tuple. boltB does some processing
and emits around
>>>>>>>>>>>>>> 100 tuples for each tuple being received.
boltC and boltD processes
>>>>>>>>>>>>>> these tuples. All the bolts implements
BaseBasicBolt.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> What I am noticing is whenever boltD
marks some tuple as
>>>>>>>>>>>>>> fail and marks that for retry by
throwing FailedException,
>>>>>>>>>>>>>> After a few minutes less than my
topology timeout, I get the following
>>>>>>>>>>>>>> error:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> 2018-11-30T20:01:05.261+05:30 util
[ERROR] Async loop died!
>>>>>>>>>>>>>> java.lang.IllegalStateException:
Attempting to emit a message that has already been committed. This should never occur when
using the at-least-once processing guarantee.
>>>>>>>>>>>>>>         at org.apache.storm.kafka.spout.KafkaSpout.emitOrRetryTuple(KafkaSpout.java:471)
~[stormjar.jar:?]
>>>>>>>>>>>>>>         at org.apache.storm.kafka.spout.KafkaSpout.emitIfWaitingNotEmitted(KafkaSpout.java:440)
~[stormjar.jar:?]
>>>>>>>>>>>>>>         at org.apache.storm.kafka.spout.KafkaSpout.nextTuple(KafkaSpout.java:308)
~[stormjar.jar:?]
>>>>>>>>>>>>>>         at org.apache.storm.daemon.executor$fn__4975$fn__4990$fn__5021.invoke(executor.clj:654)
~[storm-core-1.2.1.jar:1.2.1]
>>>>>>>>>>>>>>         at org.apache.storm.util$async_loop$fn__557.invoke(util.clj:484)
[storm-core-1.2.1.jar:1.2.1]
>>>>>>>>>>>>>>         at clojure.lang.AFn.run(AFn.java:22)
[clojure-1.7.0.jar:?]
>>>>>>>>>>>>>>         at java.lang.Thread.run(Thread.java:745)
[?:1.8.0_60]
>>>>>>>>>>>>>> 2018-11-30T20:01:05.262+05:30 executor
[ERROR]
>>>>>>>>>>>>>> java.lang.IllegalStateException:
Attempting to emit a message that has already been committed. This should never occur when
using the at-least-once processing guarantee.
>>>>>>>>>>>>>>         at org.apache.storm.kafka.spout.KafkaSpout.emitOrRetryTuple(KafkaSpout.java:471)
~[stormjar.jar:?]
>>>>>>>>>>>>>>         at org.apache.storm.kafka.spout.KafkaSpout.emitIfWaitingNotEmitted(KafkaSpout.java:440)
~[stormjar.jar:?]
>>>>>>>>>>>>>>         at org.apache.storm.kafka.spout.KafkaSpout.nextTuple(KafkaSpout.java:308)
~[stormjar.jar:?]
>>>>>>>>>>>>>>         at org.apache.storm.daemon.executor$fn__4975$fn__4990$fn__5021.invoke(executor.clj:654)
~[storm-core-1.2.1.jar:1.2.1]
>>>>>>>>>>>>>>         at org.apache.storm.util$async_loop$fn__557.invoke(util.clj:484)
[storm-core-1.2.1.jar:1.2.1]
>>>>>>>>>>>>>>         at clojure.lang.AFn.run(AFn.java:22)
[clojure-1.7.0.jar:?]
>>>>>>>>>>>>>>         at java.lang.Thread.run(Thread.java:745)
[?:1.8.0_60]
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> What seems to be happening is this
happens when boltB emits
>>>>>>>>>>>>>> 100 out of 1 tuple and boltDfails
one of the tuples out of
>>>>>>>>>>>>>> those 100 tuples, I am getting this
error. Not able to understand how to
>>>>>>>>>>>>>> fix this, ideally it should ack an
original tuple when all
>>>>>>>>>>>>>> 100 tuples are acked, but probably
an original tuple is acked before all
>>>>>>>>>>>>>> those 100 tuples are acked, which
causes this error.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Best Regards
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Saurabh Kumar Mimani
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> *Peter Chamberlain* | Senior Software Engineer | HTK
>>>>>>>>
>>>>>>>> T: +44(0)870 600 2311
>>>>>>>> Connect with me: Email <peter.chamberlain@htk.co.uk>
>>>>>>>>
>>>>>>>>
>>>>>>>> [image: htk logo] <http://www.htk.co.uk/>
>>>>>>>>
>>>>>>>> Connect with HTK: htk.co.uk <http://www.htk.co.uk/>
| LinkedIn
>>>>>>>> <http://www.linkedin.com/company/htk/> | Twitter
>>>>>>>> <http://www.twitter.com/htkhorizon>
>>>>>>>>
>>>>>>>> HTK Limited, Chapmans Warehouse, Wherry Quay, Ipswich, IP4
1AS, UK.
>>>>>>>> Company Registered in England and Wales as 3191677, VAT Number
675
>>>>>>>> 9467 71
>>>>>>>>
>>>>>>>>
>>>>>>>> PLEASE CONSIDER THE ENVIRONMENT BEFORE PRINTING THIS EMAIL.
>>>>>>>> This email is only for the use of the intended recipients
and may
>>>>>>>> contain privileged information. If you’ve received this
email in error,
>>>>>>>> please let the sender know; then delete the message. The
views
>>>>>>>> expressed in this email represent those of the sender and
not necessarily
>>>>>>>> of HTK.
>>>>>>>>
>>>>>>>

Mime
View raw message