Lessons learned form Kafka in production (Tim Berglund, Confluent)

good alright now I see some familiar faces and as some of you were in a talk I did before lunch that was on a similar topic and I see some faces that were not there so I'm going to cover some of the same ground in this talk there will be a little bit of overlap and a lot of different material as well there was some stuff I kind of went quickly over in the earlier talk I'll take a little bit more time with and we'll focus on on some different issues so my name is Tim Bergland and I work for confluence this company here we're a company that supports Apache kafka and makes proprietary extensions and things on top of it we make a streaming data platform based on the open source Apache Kaka my responsibilities at confluent is I run the developer relations team so my team does training so if you ever take a class from us in a public venue or if you work for a company where we come and teach you that'll be my team coming to teach you we make the curriculum we will pretty soon be revamping the documentation and events like this we do evangelism and community and outreach and things like that so that's that's everything I'm responsible for at compliment I've been there about six weeks and loving it so far all kinds of great stuff to learn and do and build and it's exciting so hey talk to me about that after if you want to hear more the title of the talk is Kafka in production and let me give you a little bit of an outline I want to begin with the big picture before we dive into specific issues and I want to make sure you've got the big picture about streams and streaming data and really the Kafka worldview how it wants you to think of really architecture it's got its got some strong opinions there and we'll dig into Apache Kafka itself if you were in my earlier talk I went fair very quickly over this stuff I'm going to take a little bit more time with it here because these production issues won't make any sense unless we kind of know how Kafka works I want to make sure everybody's on the same page and these three production issues are actually taken from real life these are actual support issues that that confluent support people dealt with and these are all written up in a blog post on the consulate website compliment thought IO slash blog and just scroll down a little bit it's not that old of a post they're kind of briefly summarized there so these are totally real life things and each one of them kind of helps underscore a part of how Kafka works so it's just either they're nice case studies really they're not like super dramatic detective stories all the best support calls would make like nice movies right like they're there's there some plot and like bad guys and things like that they're really not like that they're not terribly dramatic but they're educationally super useful at least I think so so hey let's walk through all this stuff and let's see where we get in 45 minutes so we'll start with streams well Y streaming I talked in in before lunch about this idea that it seems like we're all born with that data has to go in a database that's the right place for it to go is you you you know things happen and you process the event and you put it into a schema and you say well here it is I'm going to stick it in this place and if we need to know about it we'll go back to that place later and get it so Kafka's opinion is that all of your data is really event streams and this isn't that crazy of an idea because if you think about it that's what your software does is you do process events now maybe the event is a request maybe it's an actual event like a sensor reading or a transaction or something that happens in the business out out in the world that's really all you do is is events happen you process them and Kafka is trying to have this opinion of the world that said hey let's just deal with them as events let's make them streams of events not try and turn them into anything else ahead of time so you could have a request really I mean look at this this is some JSON there's a timestamp there's a user making this request and it looks like somebody is looking for a product detail for product 5678 now that's very conventional probably everybody in this room has written a service that will parse JSON like that turn it into a sequel query for the relational database get the data out of the relational database and send it back over an HTTP connection right that's that's just that's that's bread and butter as we say but that's also an event that doesn't have to be a request we can consider that an event if you work in any sort of IOT thing sensor data of course our event I'm hearing a lot from like car companies they're super interested they're all the sudden cars need to be internet connected devices and well there are a lot of cars and there are a lot of sensors in cars so there are all these events that our cars are generating any other kind of sense or any other kind of IOT thing thermostats power meters whatever they are lots of sensors in the world log files now this is where I think streams get really cool because what were the last two things I said you know here's queries here's sensor data those are those are business things a user is doing a thing or a device is doing a thing and that's in our business domain logs are an operational thing this is not in the business domain only I guess properly what we call dev ops people developers who are programming systems would care about logs well logs are events they very much are you know this this last line here is a is a new thing that goes on the end of our event queue and all of the previous entries are immutable these are classically events and streaming as an architecture gets exciting when we can begin to integrate operational stuff that's really in the life of our system with business stuff that's in the life of the business normally those two things don't meet you've got DevOps people who you know put logs into into starts with an S I'm blanking on it you know the thing that that parses Logs with it logstash is not what I was thinking of but that would be another thing you could do yes no it's not elasticsearch I'm not going to make you wait for me to think of it it's going to what's that now five minutes from now is going to pop in my head I'm randomly going to say the word you'll know what I was coming from anyway when we can begin now to intersect this operational data with business data you can do some exciting things when those two domains can mix but when all this stuff when all these are events and they're all happening in a message queue you have that option databases databases can also be events this is the the table event or table message interchangeability thing here so a table is really a collection of kind of key value pairs to an approximation right often there'll be a key and many values or you could think of it as a tuple you know a bunch of key value pairs but we could just use this abstraction here there's a key and a value key one key to value three value to over time as that table is modified we can think of modifications as messages at time zero we have key one value one at time one what do we do we add key to value – at time – we still have key – value – we have key 1 value 3 we've changed a key right well each one of those things could now be a message the first one is a mutation put key 1 put value in at key 1 the second one is another mutation put value 2 at key – the third one we're changing key ones you can see updates to a table can be a stream of messages anything can be a message anything can be an event if we want it to be we can use this simply to store when I say data pipeline we can store these events Kafka is capable of doing that we'll look at how it does it and we can use it for real-time processing of those events we're going to look at that – now the historic approach to integrating systems looks a little bit like this we've got all these applications and all these databases and all these crazy point-to-point connections that are more or less hand coded that's often how we do this kind of thing if we decide that all of those changes of state in all of these places are going to be a data pipeline that becomes one stream of events and we have a single streaming platform that can handle that that's kind of the promise of Kafka right that's what it's trying to do all of these things that are involved with your life your applications a relational database monitoring purely user facing things like search all of these can just hang off this streaming platform hang off this event stream and life is good now some of these things like like search that's an example of you might have you know user business activity coming into this thing you have messages and that that search guy there might be pulling messages off of the event stream and dumping them into an elastic search cluster for example which is now kind of like its own little adjunct database out there that's not heresy that's not a bad thing to do that is a secondary database it's not the event stream it's a different place to put things but if searching is what you need to do well you can't really search in here this doesn't do that this is a queue so if you need to do what elastic search does that's fine it's okay dump that stuff in there and have a service that that queries that cluster that's okay to do so you get a lot of flexibility here it is always forward compatible so what happened here is a product was viewed this is this is what happened we had an app and it sent a product view request into the messaging system that was that JSON object we looked at a few minutes ago the streaming platform light also dumped that event into Hadoop let's say that's all that it does right now we remember that that product you happened so we can do analytics on that later on we put the product view message in and this Hadoop service over here listens dumps it into HDFS we can analyze it later it's when I say it's forward compatible I mean other kinds of things can now easily get into that product view game we've got this message bus we can send messages in get messages out do that same product viewing thing easily enough it's easy to integrate new things with it we've got a single interface that all those things talk to and when that gets more complex and we have to layer on security and real-time analytics and and recommendation engine all those things we've already got the product view message going into a product view topic and those new services can listen to that and get their work done it becomes easy to integrate that so when I say it's it's forward compatible I mean it's easy to extend the future they're easy to extend the system to evolve the system going forward well how can we build such a thing because this is a extremely high level diagram let's actually dig into a little bit about how caca works and these will be necessary details so that we can look at those number three four and five kind of the promise of interesting interesting production problems with Kafka alright so basically what have you got here are the pieces you have producers you have a Kafka cluster you have consumers importantly producers and consumers are clients of the cluster these are our micro services our monolith whatever it is these are in our application this is an application library you are writing this code you are directly manipulating the producer and consumer API to put messages in and get messages out what's the fundamental data model of Kafka well it's a message queue so it should not come as a surprise that it is a log when I put a message on my next write will go on the end and I am NOT able to change these things here once I put something in I can't modify it we call it a log and like a log file like an operating system log or log for J log or something I've got usually if you're editing a log somewhere up you know not appending to it you're probably covering up a crime right so it's just a good rule of thumb this is immutable just like a log file unless you're doing something very bad yeah you put stuff on the end and that's it readers or consumers there could be multiple consumers of a single log here and they can have their own point that they're reading from and we'll look at that a little bit in a little bit more detail and then but basically fundamentally it's a log and the writer is called a producer and the reader is called a consumer like we saw before and it's not really called a log in Kafka it's called a topic just like in every other messaging system in the world but Kafka is a distributed message queue so there's a very important difference here from how this works on a single master message queue now on a single master message queue that the traditional enterprise messaging systems there is you know whatever kinds of replication they support for a right to happen there's one thread on one processor that you have to go to and there is fundamentally one queue that they support that means there's an upper bound on how big they can get and how fast they can go but they can also always keep things in order Kafka is a distributed message queue every time I talk to people about distributed systems I always try to give you a warning not to do it like it would be so much easier if you if you didn't if you build a small system right don't don't build a big system choose a simpler life like you can live in the country you could stop work at 3:30 or 4:00 and everything's better everything is better now I know you're not going to take my advice you're going to want to build a distributed system anyway and most of us do because kind of business problems are getting bigger and systems are getting bigger and let's be honest it's kind of cool because we're the sort of people who think this is good but anytime you take something that you could have solved here in a single thread on a single processor and distribute it among multiple computers something goes wrong in your life alright and you know you've know it see if you if you build the system you know it's true and like theoretically like mathematically things are just bad life is bad now when it comes to Kafka there really is only one major compromise that you're making as a result of it being a distributed message queue and that is ordering it is a Kafka topic that's an Eames based collection of messages and a Kafka cluster can have thousands of topics and it's theoretically a basically practically unbounded number of topics so having thousands of topics is not unusual but each topic is a partitioned log so that topic is going to be partitioned among multiple computers we'll call that we I'll introduce the term in a moment we call them brokers so each one of these partitions is going to go on a different broker when a producer right the producer will have to make a decision about what partition to write to what have I given up ordering so normally we think of a queue as a thing that's ordered in Kafka we only have ordering within a partition so I don't know when I write to you know my rights going like here here here here here here here here I just I don't I can't recover that ordering I have ordering within the park each partition but not globally and I can't ever have it globally if I'm going to partition that's just a limitation that's because of math and when you look at like the complexity of most distributed databases and other kinds of systems it's not that big of a deal it's not that bad and it turns out at the API level when you dig into what it really is to write the code to produce a message you've got a lot of flexibility you can have a lot of flexibility to choose in a sense which partition you're going to do or I can choose the partition but you choose how ordering will be preserved so you do have a lot to say about that if you want but fundamentally you do not have global ordering all right back to the table stream duality that I was talking about at the beginning we had that those updates to the table remember I had the three different states of the table well here they are here's put key one V one value one key one value – key – value three key – value for key one value v or updating the table and each one of those updates is a message in the log so a table and a stream are or these can be isomorphic you see that happening in that log right there I am able to create groups of consumers each consumer is a single threat and each consumer is going to have responsibility for well every partition will only be consumed is only being consumed here by one of these consumers in the group so a consumer group is a convenient way for me to say alright I've got these three or two or ten or 50 servers or threads or processes out here and I've got some number of partitions in my topic and the consumer group says look I want all those partitions to be divvied up among these consumers these are the consumers that are going to be processing the messages from this topic I don't know how many partitions the topic has but I know I've got these consumers Kafka divyam up so I can I can have those partitions in and and have this group consume those things so I could have a single process handling all the partitions if I have two processes then two of if I've got and three partitions two of my partitions will go to one one will go to another and if one of those crashes Kafka handles that and reassigns to the existing consumer group so because ooh rubes are just a handy way to to divvy up a partition among multiple consumers now because the underlying abstraction is a log then basically we get filesystem performance when you're at database there's all kinds of stuff if you if you look in detail at the right path and read path of a database no matter what kind of database it is it's a lot of work there's a lot of housekeeping you have to do to build indexes and merge log trees and whatever it is you do as a database there's it's kind of hard Kaka it's not that hard so we can get hundreds of megabytes per second of throughput you can have lots and lots of storage many terabytes per node in the cluster on commodity hardware and writes are constant time we're tending to the end of a log file so you get you get you know wha throughput you never you need a surprisingly small number of nodes to handle a lot of work if you compare it to you know trying to do the same sort of bandwidth that you do in a Cassandra cluster or even in HDFS you find that you need many many fewer machines and kafka it's just a lot faster okay and we have things like replication fault tolerance partitioning and scaling those matter when I we go through these failure scenarios these production failure scenarios we're going to have to dig into those and look at how Kafka does those things so that is our streaming platform ah question pardon me hmm I don't like that microphone could you please move one flight back and say a few words about guaranteed chalcopyrite this this slide yeah yes get over it because I can if I want but because you asked yeah not much here strict ordering we already talked about that within a partition I've got I've got ordering inside a partition and persistence so usually all right I've got a topic that's this namespace for messages probably going to partition that that's the whole theory here is I've got too much work to do on one computer so I'll have chunks of that partition on multiple computers probably each of those partitions will be replicated let's just say my replication factor will be 3 because that's sensible I'll have 3 copies of each partition now one of those replicas will be the leader so when a producer is writing to the cluster it writes to that lead replica maybe this is kind of a broader discussion of persistence depending on how I've got it configured when I write to that lead replicas it might also need to propagate the right to followers so I might want to make sure that it gets written to the leader and to some number of the followers also so this is I'll kind of stop there for time but that's what I mean by persistence it will at least be written if I'm not replicating done you know I've written to disk there but I will probably also replicate it and there are a number of configuration parameters that are going to tell me what it means for the right to succeed the produce operation to succeed question there either shout or wait for the microphone since we're talking about guarantees the question will last Jetson test for Kafka found was September 2013 the last what test Jepson test by Kyle and Bria yes and it showed during split-brain 50% right loss ah why it is not changing why there are no new tasks and so he directly showed guarantees were broken there is no strong consistency right Kyle Kings very does that pretty much with all computer programs right so there he's able to find scenarios in which they break now you said the last King's very test was 2013 I don't know why he hasn't done it since then I mean it would be it would be good the product has changed a lot but the project has changed a lot since then so I don't know like specifically I'm sure there are jurors associated what he did and I don't off the top my head I don't know the history of those things it's good it's a fair question I just I don't know details and I am absolutely confident that there's some some scenario that he can come up with because he does it for everything he's awesome i I've never met him I'd like to just thank him for being the sort of person who wants to see the world burn okay we need people like that all right anybody else what's that that was it was 0.8 right okay so a while ago hey so I have a question for example I need to plug in new micro servers to my abacus system and for example it needs to know what was going on let's say three hours per your when equals plug in and is there any capabilities – yes yet consumers can rewind ok so now you get data retrospectively or yes subject to the retention policy so the retention policy is configurable the default is seven days the maximum is forever the New York Times famously keeps all of their content for all of their newspapers going back to the 1860s in Kaka and for America that's a long time ok 1860's that's like forever ago so yeah you can you can go all the way back to the limit of the retention policy you do that by an offset you don't query right you know do it based on content otherwise it would be a database but point is consumers can rewind ok thank you you betcha ok any other questions you guys are you guys are waking up it's good all right let's talk about some production problems that's what we're supposedly here to do and these are good these help underscore the some of the internals we've been talking about now although mr. Kingsbury claims there is no strong consistency at all in typical cases there is and it often it often usually behaves like this so the way partitioning works is there is one lead partition probably 2 followers pardon me no one lead partition multiple followers there cluster slide there is one broker that is acting as the controller and will talk about what it does it's just out there so there's the lead partitioner and follower partitions don't confuse partitioning with replication right partitioning is how we scale a topic so there could be many many many partitions and and only probably three replicas each so one of these is the leader two are the followers for each partition let's just take a look at this diagram let me explain this there are four brokers so four separate computers the I hope that looks black that may that may be a blue or black whatever color looks like this color those are the lead partitions and these orange guys are the follower partitions now go across in rows here this is only one topic now these brokers probably have lots and lots of topics on them we're just looking at one topic in this diagram and if you look that top row those are all partition one topic one partition one one one topic one partition two two two three and four four partitions one topic all right and there are my set four partitions of course three partitions one topic for brokers and you see the three partitions of free replicate get these words confused all the time four partitions three replicas and the three replicas are on brokers one two and three for the first partition two three and four for the second partition you see how they're laid out so for each replica one broker is the leader have to define a new term and that's NSYNC replicas or is our s the is our is the in sync replicas are those replicas that are caught up with the leader so remember when we write to the leader we propagate that right to the follower partitions the follower replicas and it is possible for these replicas to fall behind and for them not to have acknowledged the most recent right and when they do we take them out of the in sync list so ideally all of the replicas are up to date with the leader sometimes they're not for a right to be considered committed it has to be received by the leader and all in sync replicas so for replicas has fallen behind we temporarily say we're not going to worry about you we're going to try not to worry about you if a message hasn't been acknowledged by the leader and the is ours it hasn't been committed and we can't read it all right with that in your head replicas can fall behind the is our list and grow and shrink we can have all replicas in sync we can have some replicas in sync and what happens if it gets too small well in a real world scenario encountered by support this happened and sometimes this is going to happen the is our list is going to shrink there's going to be some load you might get an alert oh no there are some replicas that aren't in sync and then it clears so in a real world customer scenario that happened let's say last week there was an alert on on the is our list getting too small and then it got better okay moving on right no big deal then it happened again as as far as the admin knew thing had changed but it didn't clear the error didn't clear and Oh what happened well there's a bunch of digging and looking at Network cues and all kinds of things going on turns out again like I said these these scenarios are not detective stories so don't be disappointed that I'm just giving me giving away the ending it turns out one of the brokers had gotten an update it was running a new version of Kafka the others weren't that new broker fixed a bug that caused it to run faster and caused it to conclude that causes caused it to go faster and stay out of sync with its replicas so the is our list was permanently small because we had upgraded this one broker to a newer version that had fixed a performance bug and so you had this permanently permanently out of same thing so moral of the story watch your is our list and also moral of the story don't upgrade just one broker that's probably an absolutely terrible idea so I hope somebody was counseled yeah is it possible to update life all the cluster all the block what broker sit once for your time you usually roll those so you wouldn't you can talk about to upgrade the know you would want to roll that so you upgrade one then upgrade the next and upgrade the next number the next you don't want to upgrade one and then go home and come back on Monday and keep keep going that that would be the bad thing so don't do it on Friday ever and you don't do them all simultaneously because then the cluster would go down but you can take replicas down we'll replicas be automatic automatically we relocate on a new broker if you take one broker down we'll replicas be I assigned a still consistent if you have for example replication factor of three you have four replicates on three brokers and one of those brokers start down or just shut down for maintenance will they replicate additional you know replica Souls replication factor will film at you if the broker if a broker goes down yes replicas can get moved right there's a lot let me just simplify so we can get together scenarios yes but there are a lot of details okay tell me please is it possible to check where the kafka cluster is alive and in good health state well thank you for that question so yeah of course you'd want that to be automated there was another customer that had set this up they had an automated health check and they were docker people which is great so Kafka was running in docker containers and they set up this health check that was checking to see if the producer port was open basically can I open a socket to the producer port and if yes it's good if no it's bad that was their homegrown health check and if they weren't able to open a socket to the producer port that that broker was dead and so they would tear down the docker container and spin up a new one with that same broker ID and and go from there which is great right and but they had this scenario where they had a broker that kept failing so they tear it down and bring it back up and it would fail they tear it down and bring it back up bring back up they're getting all these alerts that nobody likes that causing all kinds of pages so they're like well something's really wrong here let's just bounce the cluster there's going to be a little bit of availability sorry and well the problem here is that more partitions quite apart from the cluster going down more partitions means more throughput right if I partitioned a topic to more brokers that means there are more computers available to do i oh so you know in general I want more partitions right but the more partitions I have the longer it's going to take to bounce the cluster so they took it down and brought it back up and didn't realize that that was going to take could be a few minutes if they've got tens of thousands of partitions to take the whole cluster down and back up so fast forwarding a little bit through that they that decision to go down and come back up was a little bit surprising they didn't they didn't realize because of the large number of partitions that they had that that was going to be a slower operation than they thought they thought was going to be a quick thing it turned out it was a few minutes and it was super embarrassing so also once they bounced and came back up that node was still dead it turns out it was just a connectivity problem a router configuration thing where the automation couldn't talk to that one dumb broker and they took the whole cluster down and brought the whole cluster up for nothing's terrible anyway the moral of the story is automation is a sharp knife it's a good thing we need sharp knives but you can also hurt yourself with a sharp knife which is what they did okay let's see if we can get through this one remaining scenario in the next three minutes now let's say you're old school you're not in the cloud you still run your oldin your own infrastructure it's fine I like them they're noisy they smell good rooms a little cold but hey you know sometimes that's nice you finally get the new servers in spin it up you install Kafka you give it a unique broker ID and you know you rehearse this in the staging cluster of course right you add the the broker and the staging cluster you've got some traffic everything's fine it's great well how did you do that well there's this tool called Kafka reassigned partitions if you're adding a broker you're going to have to move partitions to it this tool has a few different modes it's got a generate mode where we tell it hey look I want you to generate migration instructions for brokers 5 & 6 whatever partitions are on those suckers just move them around do what you got to do create all this JSON for me to do that work and then I'll take the JSON you generated and I'll execute it there's also a verify mode if you want but I create that JSON I generate it so you do that in staging you generate you execute it works fine you go in production and it doesn't work so good now what you were going to do is you have these three brokers with those partitions kind of like what we saw before now you're going to have these four brokers you're going to move a couple of the partitions to broker 4 that's what fundamentally what the command does when you're adding a broker this customer when they did that they got a little upset and all of a sudden they they ran the migration you know they generated they executed hit go ok this should take like five seconds everything's great and bells started going off and SLA is we're being violated the cluster slowed to a halt because generate pays attention to the actual number of partitions that are in the cluster and in their production cluster they had one or two orders of magnitude more partitions than they had in staging which is always the case right staging and production or never all that similar so they were doing this extremely expensive operation in production so the moral of that story is a bi console and enterprise an auto data balancing feature or pay attention to your dang script right they did that unthinkingly and they generated the JSON they did not look at it they just ran it like they did in staging and that ended up being dangerous this is part of the commercial stuff that confluent makes it's a feature that that automates the rather manual process of moving partitions around anyway if you want to know more if you want to play with it console with IO slash download or look for a meetup join us in our slack channel I hope you learned a few things about the way Kafka works we are at time thank you very much

21 thoughts on “Lessons learned form Kafka in production (Tim Berglund, Confluent)”

  1. Q1: can a broker manage multiple partitions of same topic? In other words, can no. of broker be less than max no. of paritions in a topic?

    Q2: If answer to above is 'no' then can we say that in order to add new partition we need to add new broker first?

    Q3: What is best way to reduce no. of partitions? Should one just delete that extra partition, also will the broker handles deletion of its replica as well? OR is there a way to deactivate a partition I.e. set that extra partition to read only mode?

  2. 17:07 What if I use Kafka with just a single partition and a single broker ?. I kind of like of Kafka not just for its distributed nature but for how process streams and Ksql, but also the environment.

  3. such nice arms on this guy. if only he waved his biceps around more. or wore a tank-top. hair needs a bunch of work though.

  4. To add, "Commodity Hardware" here apparently means machines with a minimum recommended RAM size of 64GB.
    Would I call that commodity hardware? Probably no. But just wanted to point that out.

  5. TBH … They should have more experienced person giving these talks … This guy does not seem to have any hands on real experience

  6. Is he talking about Hibernate version 1? I've seen similar pictures 10 years ago on the meeting on hibernate… what is the useless talk… I am disappointed.

  7. "Starts with an S I'm blanking on it" ?
    I heard "starts with an S I am plunking on it" LOL, I thought that was just a very nice joke from Tim to suggest Splunk.

  8. This one is a better insight on particular issues in Kafka production.. https://www.youtube.com/watch?v=MhyW1FUdN8I

Leave a Reply

Your email address will not be published. Required fields are marked *