Real time Search on Terabytes of Data Per Day Lessons Learned

all right hi everybody thank you for coming I know there's a few more people streaming in but it's the last presentation before lunch so I know you're eager to get this over with and get some food so we're going to be talking about time oriented event search new level of scale it's a slightly different talk than what's in the program or title that what's in the program but it's the same talk I assure you so by way of introduction I'm Joey echevarria I'm the platform technical lead at ro Khanna Michael Peterson will be co-presenting with me he's one of our platform engineers and I'll talk a little bit about ro Khanna does why we needed our own search engine and how we sort of architected and built it so for some context on bro Khanna we built an application for handling very large volumes of machine data what I mean by machine data is think logs and metrics if you're useful you know used to IT monitoring software we're basically itoa or itom software our solution is on primer in the cloud the key thing is we don't do software as a service that means that each one of our customers installs a version of our application on their own Hadoop environment this isn't something where we get to manage everything in control everything that means that support ability is a very big deal for us we want our customers to have predictability above anything else it has to deal with wonky environments we want relatively consistent performance even if it's not optimal performance such that there's very little deviation we also want to make it very easy to configure and operate and basically all of our decisions for how we approach event search are informed by these constraints your particular environment your particular use cases may have nothing to do with how we need to use search and so that doesn't mean you should take everything that we say as gospel so I set a new level of scale what do we mean by scale a typical environment for our customers is collecting tens of terabytes per day we're talking about every piece of log data generated in their entire day Center this isn't just one department this isn't just one part of the organization this is the entire IT staff sending all of their log data so if you're looking at 20 terabytes of event data per day and you're looking at events sizes of between 200 and 500 bites you're talking between 43 billion and a hundred billion events per day that we want to ingest and make available for query the other constraint that we have is that we want this data to be available online for query for potentially years that means that we have to make design decisions that allow you to access older data without the existence of that older data impacting the performance of queries on new data because the reality is is that most of the queries coming in are going to be on that hot data but you still want to be able to go back without having to do things like replay data by hand or move data around between cold stores and warm stores and hot stores and things like that so before I get started on our stuff I want to talk a little bit about what we used before we built our own thing which is we originally started with solar cloud most of what I'm going to talk about also applies to elasticsearch but solar cloud was the system that we were using and basically when we started building our application we wanted to take advantage of a general purpose search system that had high scalability that could sit on top of HDFS that could replicate data for us and solar code is a very good system for that it's an amazing feature set especially for a general purpose search engine it's got very good support for relatively moderate scale it's excellent at content search think new site documents really good if you've got a finite data set so if you've got like a product catalog or something where you may be getting updates but you're not really getting tons of new data every single day and then what we call lower cardinality data sets and when I talk about cardinality I talk about it in the sense of unique values per field or per column and so for product catalogs you know there's actually a relatively limited number of unique values for every sort of field like other than the ISBN number everything is pretty common between things that are in the same category the other thing is that most of these general purpose search engines you talk to the people that have used them at very high scale and very high rates their data sets fit in memory usually it's across the cluster that's why they're using solar cloud or using elastic search but they're still talking about data sets that can reside in RAM when you're pulling in 20 terabytes of data per day you want to have years of that available on top online for query you can't store that all in RAM you have to go to disk so some of the problems that we found with a solar cloud the fixed shard model was very painful basically when you create a collection you can tell it how many shards it has that's it you can't do online changes to the number of shards very easily without sort of rien dexing all of your data you can do multi-level or semantic partitioning so by default they partition by the hash of the event or document ID you can do things like range based partitioning based on time by layering it on top but it gets layered on top it's not something that you can get down at the storage layer all the shards are open all the time that means that even if you have cold data mixed with hot data you're spending resources to keep those searchers up and ready to go which means that the longer that you keep data the sort of slower everything gets so one of the things that we found is for customers with very high rates of event in Jess we had to prune their data sets roughly every five days in order to keep the performance level consistent the api's are more optimized for record at a time and near real-time indexing there is some support in the api's for batch base indexing but everything also goes over rest if you take binary data blow it up into XML send it over HTTP and then turn it back into binary data on the other hand other end you're talking about a bunch of data conversion and a lot of overhead just to do your ingest ingest concurrency is wonky what I mean by wonky is we were only ever able to light up about two Orr's on an eight-core 16 course system with solar cloud we did as much tuning as we could and basically we got to the point where there are all these little things that add up a big thing is write amplification the way that all of these systems work is new data comes in it gets written to disk it eventually gets merged with older data so that to optimize the index and things like that with solar because you're always writing both old data very old data and new data into the same shards that old data gets rewritten many hundreds or thousands of times and then a bunch of other sort of weird small things usually at this point people ask me a bunch of questions about did you try this did you try that we tried a bunch of different things to scale solar more using multiple collections as partitions and doing sort of query planning above the solar level that's kind of crazy we didn't actually get that far but we did use the I don't remember exactly what solar calls it but the ability to sort of route events two partitions based on different criteria and we use time because we're a time oriented event system every query has time in it running multiple JVMs per node don't ever do that if you're running it as a service in your own data center and you're the only one that cares about it that may be fine but telling customers yeah to spin up these different JVMs and configure them with different port numbers and just remember which port is for which process just don't do that pushing historical searches into another system this is very common where you maybe have like a long-term grep system based on spark or MapReduce that's just silly you want all of your data searchable through one system building your own caching layer again you really are in a bad way if you have to do that so at some point sort of trying all these different hacks and trying to push solar as far as we could we got to the point where building something new ourselves is more expensive than trying to patch over what we had again this is not to say the solar cloud or elasticsearch are not great systems they are it all depends on your use case and for our use case they just weren't cutting it unless you're getting to our scales or getting to use cases that have similar pain points go ahead and use those general purpose systems are going to do just fine for you now I'm going to invite Michael up to talk about the architecture of what we built and how we sort of solve some of these problems all right so we built a thing which right now we call rakhana search and I'm going to let's let's dive into the goals and the design of ikana search so here we identified four important goals for kind of search and the first three are really about scaling better scaling better horizontally scaling better vertically and scaling better over time because remember we deal with time over into data so everything's coming in with a timestamp and won't and we can leverage that to build a more scalable system so we want to do higher indexing per node then solar than the plot product we currently have we want to build a scale horizontally better you know how do basically Hadoop limits only and be able to do as joey said have arbitrary amounts of data on disk and not have that slow down current ingest or query of any particular time slices and the fourth bullet is basically ability to add and remove nodes without any manual intervention so as we've said we run on Hadoop and so we are we leverage the features that who duped gives us we write our interview seen it so we built on top of leucine we write our lucena indexes to HDFS that gives us both our failover model as well as our replication model and we also do our ingest from kafka kafka is the basic messaging system we use in rakhana ops from the beginning and we leverage that as a high-throughput ingest feature here so every node runs an in jester pulls from Kafka rights leucine indexes on to HDFS there are within recondo search for high-level sub services there's the index or subsurface there's the what we call the coordinator in the executor and finally a metadata sub service which I'll talk about on the next slide when a client when a query comes in because we have a sharding model now we're running on you know run into running on to do multiple no it's each individual search of each individual leucine index of which there might be thousands is going to be done by an executor so that will issue a query against one particular leucine directory and again you might be searching hundreds of them so a query comes into a coordinator the coordinator says which partitions are in scope for this query and then figures out which executor is to send the query to the executives do the actual query sended that the fragment data back to the coordinator the coordinator aggregates assembles that and sends it back to the client so every node is running both of these services and the query client can cannot contact any node that it wants to to do the coordination and then all of them are running executives so here in the gold box you see the four sub services I mentioned the indexing service the metadata service and the coordinator in the executor and in the purple boxes you see the three external dependencies we have we leverage HDFS Kafka and for metadata we also use zookeeper so if we start with the indexer again the indexer as i said is writing lucena indexes to HDFS which is pulling out of the data out of Kafka which is and the kafka system is being populated by our data producers such as syslog type data when a query comes in it goes to the coordinator then to the executor Xand the executor as i said is actually what's reading the lucene indexes using the scenes query based system and finally the metadata system has an important job of keeping all the nodes in sync so there's a couple of pieces of information that need to be kept in sync across the cluster one obvious one is which nodes are actually up and running because as we said we want a dynamic system where you can stop and start nodes the second is around ownership of shards and i'll talk more about why ownership of charge is important in a few slides and the third is because we have a dynamic partitioning model which i'll also described in a minute which partitions exist on the cluster is not known a priori as it is with solar or elasticsearch so as new partitions news charge are being created the metadata system needs to keep all nodes and up to date on what partitions around the cluster which is particularly important for search to know which directories to search so let's talk about our ardian model we don't actually use the term shard I've used that term so so far because it's a more generic term but we talk in terms of time partitions and slices and also data set so data set is our generic term for basically any ingest schema right now we only support one ingest schema which is the basic one we use for our ingest like our syslog type data in the future we will expand to support multiple data sets we take each data set and then time partition it into using a partition strategy right now we support two of them your month day or your month day hour so each time each event that comes in as a time stamp and that will route it to a particular directory basically on HDFS so our partition strategy model is very similar to the Kaieteur hive partitioning model where it's basically directly based in HDFS and then that partitioning model is invisible to queries you just say again all of our queries have this this format of from timex to time y and potentially some additional things like host equals foo we can then figure out which partition time partitions we actually need to search without having to search you know everything that's so the planned partitioning model is the first part of our sharding model the second part of our sharding model is we then divide these partitions into what we call slices so for a particular day we might have in this example 20 slices that's 20 separate directories where 20 separate lucena indexes are getting ridden so that slice is the lowest level each of those is a leucine directory and we we also tie the number of slices of each time partition to the kafka partition so cofco when you set up a topic you can define the number of kafka partitions that it will have if you set it up as 20 then we're going to be ingesting in parallel 20 separate streams that means we're also going to be writing to 20 separate slices for each time partition so let's but look at this graphically you have that make a little sense because the term partition here is overloaded there's both kafka partitions and our time partitions so I'll try to use those two terms to get clear so for a particular data set in this example we have to time partitions January first in January second and we've defied we defined n number of slices which is again determined by the number of kafka partitions that you set up when you created your Kafka topic so every partition time partition while the same number of slices and then as events come in you route them to the right time partition in slice so here is a 3 event model events 1 and 2 or 4 January first event 3 is for january second those come into Kafka they get routed through a hashim audience system where we basically look at the body of the of the event figure out which Kafka partition needs to go to so in this case event one went to Kafka partition 0 and events two in two and three went to coffee cup partition one when the Recondo search pulls them out of Kafka now is pulling out at the level of the kafka partition event one is going to go to slice zero because it came from Kafka partition zero but it's time stamp when we look at the timestamp we handed the partition strategy and say which directory does this go to which time partition and that's for january first events two and three came from Kafka partition one so they will go to slice one but they have different time stamp periods one goes to january first one goes to january second so i mentioned a little earlier by idea about ownership of a shard and the idea here is that if you look at the two purple nodes those are two separate Recondo search nodes each of which has four time partitions so events have come in mapping to four different days so far and there's four slices because we set up Kafka with four topics are sorry the coffee topic with four partitions in this case each time slice is going to have four slices are each time partitions we have four slices but an individual node will only have a subset of those as defined by Kafka so when a Kafka consumer registers with Kafka it looks across its set of consumers and says well here I have four Kafka partitions and two nodes so I'm going to give 22 profeta partition to this one and you have the two to that one so in this case Kafka made the assignment of slices 0 and 2 to node 1 and 1 and 3 2 no 2 but that's dynamic if node 2 was to go down coffee to rebalance and give all the slices over to node 1 or if we added a third node Kafka would rebalance and give to one and one to the three new club nodes so that's one way we can that's the heart of why we can add and subtract nodes dynamically so let's talk more about the right path each search node at any given point in time owns a slice and the slice is only ever owned by one node at a point in time but thats ownership can can change over time as I mentioned a second ago we also consumed in batches for throughput so we pull off Kafka in batches and then write to leucine in batches as each event comes in we look at its time stamp we use the partition strategy to figure out what time partition that needs to go to and then we can ask the kafka message that came in which Kafka partition did you come from and then you know which slice to write it to with the time partition on the slice data you know exactly which directory on HDFS to write it to and then we have a configurable model for how often to commit to do leucine commits so you can set up to one minute five minute ten minutes whatever makes sense for your throughput needs and then finally because we have a time scale model a scaling model whereas each day as each new day arrives we are going to create new partitions what exact partitions exist on the cluster like when did you start and how far have you gotten is not known a priori so the metadata service is basically in charge of keeping the system up to date and as new partitions are created by the indexers they had formed the metadata service which then broadcast that out though the slice ownership is very important for the query side so queries come in via RPC we use Avro for both our data storage format and Kafka and also for our PC model across the cluster when that query comes in the coordinator hands off to loose teen and says parse this query for us that gives us back an ast we can walk the AST to basically figure out which time partitions are in scope and only search that subset as opposed to everything that's on the cluster so we with this pruning model we can basically be resistant to however much data you have and only focus on the areas we need to to query once the coordinator knows which time partitions to search then it's got to send it to all the slices and each executor it'll send it to one executive per slice and you want to send it to the executor that owns the slice so ownership means as I showed in the diagram before Kafka has assigned slice 0 to know once a node 1 is actively writing to that slice or may have actively written to it yesterday if it's been up for a few days so by sending the query to the executor that's on that same node you get a couple of advantages which is you can basically do you can leverage local rights you can also leverage leucine NRT Neil near-real-time index features which basically can search the RAM that's been written to that hasn't yet been committed so it basically so the coordinator looks up in the metadata system and says who owns which each who owns each slice sends the executor to that slice and as I said before that's then queried sent back to the coordinator and that assemblages sent back to the client so some benefits of this design as I said a minute ago the search processes and the search in jest sorry the suit you have the searching and the ingest are happening on the same node and we preferentially send the queries to the nodes that have the ownership which means we're getting the advantage of local disk reads and also possibly local Ram reads if it's currently being actively indexed by the scene so that gives us HDFS short circuit reads and we could actually leverage additional HDFS caching caching features which were not yet doing for search we also use off heat block cash basically the same way that solar does and then I talked about our failover model if any node goes down any other node can take over those Isis and youth and even if a note doesn't go down for some temporary reason that nobody owns this slice in zookeeper is where we keep the slice ownership information like the coordinator come look and say well nobody currently owns slice three it doesn't give up and say well I can I can still send that query out I can pick any node to look at slice three because we have because with HDFS basically you can just do a cross Network reads as a failover model for queries so there obviously is always HDFS overhead but that overhead gives us our replication model and we can actually minimize the overhead in terms of rights and in terms of queries because we try to always go to where it's local so this product is not yet GA we're still building it but so I have done some initial benchmarks I figured you'd want to see where we're at on that so but this is still early days this is all preliminary and subject to change but on the most recent head-to-head we've done vacano search versus solar using these conditions CMS garbage collector both running on HDFS on this AWS set up running in a small cluster just four nodes and eight yards we did a 12-hour run which in that 12 hours solar indexed about 300 gigabytes with events that are about 400 bytes each and McConnell search actually ended indexing almost a terabyte in that same time frame but at the comparison point at 300 gigabytes on disk these were the numbers we had gotten solar was doing about 11,000 events per second again these are about 400 bite events khana search was doing over 36,000 so from these initial numbers we're happy to see rakhana search looks to be about 3x faster per node for against solar and we actually expect it will do better longer over time so if you do a 5-day run or a two-week run solar as Joey site is going to have to keep right into its same shards over time whereas rakhana search will evolve new shards over time so you have smaller indexes to search and right to every day so we hope to see these numbers even better when we do the longer runs the last set of benchmarks I have around query and I have to hear the first is a query round trip during ingest that means while all the nodes are actively ingesting as a large data stream as fast as they can and so this is a latency distribution by using percentiles on the y-axis basically it's seconds so this says sollars the red line Recondo searches the blue lines so during ingest most the ninety percent of solar queries we're done in about of second and a half and ricotta search is about two seconds so we are slower than solar and in fact here is a similar graph for the case with no ingest currently yet we are slower than solar our goal is to at least be competitive we have a number of optimizations we know we can apply to our query model which we haven't yet got to so we hope these numbers will be better in the near future so to sum up what have we have we achieved our goal what have we shown we believe it is possible to build a search engine that's based on time oriented data built built on a time partitioning model I can deal with the high cardinality datasets we're going to get do the high-speed ingest of terabytes a day not having to age anything out of the data set handling both ingest and queries at the same time and fully utilize modern hardware you know maximize the CPU be we have a threading model where we can basically peg the CPUs during ingest leverage the network and leverage local disk reads on HDFS we believe these things are possible and I've been happy with our results so far so with that this is the Recondo search team that's worked on this and we'll take questions now so the question was we're using a bro encoding for the events in Kafka and do we only support one schema do we have multiple schemas right now we only support one schema our application is built with a generic event schema that we use for everything in our system that being said we do have plans for ricotta so in particular to support multiple schemas in the future we haven't figured out all the details for how we want to do that as well as where we want to store this extra schema information about the sort of leucine side of things so the question was do we know an idea of what the overhead of using avro for serialization d serialization is we do have some numbers on that it's very small the deserialising is way less than one percent of one core from a CPU perspective and it's mostly a CPU that you would worry about there the nice thing is that a bro does compress fairly well so if you turn on Kafka compression you can get the on over the wire size down pretty low so yeah we don't we don't have a big overhead from a bro serialization and deserialization our biggest bottleneck on ingest tends to be cpu utilization by the lucene indexers themselves I had done benchmarking at my previous company of JSON parsing first to say Avril partying and it was ever parsing as many times faster like 10 to 20 times so back I think the question was about security and integration with kerberos that's a great question so we both have integration with kerberos on the HDFS side so if you have an HDFS cluster that has Kerberos enabled we will use kerberos authentication for writing the leucine indexes and talking to HDFS we also have a Kerberos based authentication for our PC clients so if you're talking to us with a search client you can enable sasal based RPC over that interface and then lastly the one place where we don't currently have support for kerberos is in Kafka itself so we build against Kafka 082 which is before Kafka had security features we do hope to be able to upgrade to Kafka 09 but as we said at the very beginning we build an application that we have to ship to customers and so we can't always move to the latest version of things we have to work with environments that will meet the needs of the customers so we do hope to be able to move to Kafka 9 and be able to support the kafka features on security features on in jest but that'll be pending on update to our libraries yeah many of our customers our enterprise customers and they require Kerberos integration so we vet we've built that in from the start but we haven't done yet as benchmarks so we don't know how much that's going to affect query and throughput yeah we do expect that having Kerberos authentication will lower at least query performance because that's the main place where it'll be in our system but we haven't benchmarked it yet yeah that's a great question um so to repeat the question for everyone have we thought about ways that colder data can reduce the storage footprint versus hot data and we have thought about it one thing that we do differently then solar or the solar has some support for this now is from the beginning we've always relied on just HDFS for replication solar actually has two different levels of replication it'll replicate on the HDFS layer for the indexes themselves and then you can turn on replicas above that and in fact with solar the only way to get read replicas is to replicate at the solar level that means that now if you have one event coming in and you have 2x replication you're actually writing six copies on the disk three times on HDFS twice we don't do that and we have read replicas like Michael said if the primary node hosting a particular index is down we will just route it to another node and it was open up the files off HDFS so that's one thing we have also and so one of the things that we've thought about is in the future we may want to warm up some of those read replicas which means that instead of just sort of picking a note and having to read it off of disk we may have some nodes sort of primed for reads so that if you have a very high concurrent query load coming in you can take advantage of that and actually query multiple nodes in parallel and one of the things that we've thought about with doing that is we would have more sort of warm read replicas for hot data for newer data versus older data we would only have sort of one primary replica or read replica the other thing is HDFS does have some features for sort of controlling the replication factor or using sort of different encoding schemes so that you can sort of get the effective protection of 3x replication with a lower on disk footprint we haven't tested with those but that is something that we could leverage and that was part of the reason why we built on top of HDFS as our storage model is we didn't want to have to reinvent all of that we want to be able to take advantage of all of the great innovations happening on the HDFS side see that one more time don't talk about that front front ends like so i think you mean like languages for crea clients and things like that gotcha so all in jest right now is through Kafka so if you push into Kafka you know with any language we pull it out has to be in our schema right now but again we're gonna adapt adopt model that can have the multiple schemas in the future in terms of query right now we only support avro query RPC we may addressed at some point if the use case arises yeah agree with that and then you know so our application you know ro Khanna ops which is the name of our application comes with a web interface which is how customers primarily access it access our data and that web application that runs that that you I does have a rest endpoint which you could use to query us via another API but the two ways would be that rest api or the a ver RPC and we are going to be shipping on day one with a java-based client that's built to speak the a ver or pc the next world is in the internet the idea to support all the kind of penises I mean secretary yeah so the question was you know in our architecture diagrams we focus very heavily on like the partitioning by date and sort of indexing by date but do you support sort of more flexible partitioning strategies or indexing strategies and answer is yes so our partition strategy that Michael talked about it supports date based partitions it can go down to hourly if you want most of our customers do daily but it also does support hash-based partitions on other fields so if you have other fields that you know we're going to appear in every query then you can enable hash-based partitioning on those fields and that'll again let you sort of winnow down the data at the storage layer and in particular for our event schema we've got time Sam's host service and location which are strings by default as well as an attribute map which is basically just arbitrary key value pairs that we index against so that's a good question do we plan to support joins between different data sets or against other data in the future we haven't thought too much about that we are focused on sort of the search use case that being said one of the things that rakhana does have is this embedded library that we use for data transformation and one of the things that that supports is the ability to do either data transformation on look up and eventually on query where it could be able to sort of join data with sort of like small data sets so one example there is a lot of data coming in has IP addresses and you may want to do geolocation on those IP addresses using like the max mind database or something like that so that's something that we support today purely on ingest in the future we want to be able to run that transformation library over the results as they're streaming out of the query I think there was a question over here as well yeah Oh mm-hmm gotcha so the question is you know did we evaluate elastic search more thoroughly and the answer is no not really we we've read a lot about elastic search and in particular we've read there's a developer his sort of screen name his name is Kyle Kingsbury his screen name goes by a fur but he's written a call me maybe blog series using a tool called Jepsen to do testing of distributed systems from a sort of correctness of perspective basically throwing a bunch of data at systems creating a bunch of network partitions and saying like okay do you lose data do you acknowledge rights that you then later can't serve serve back an elastic search has had a few issues there so based on the power of those results we sort of discounted elasticsearch kind of from the beginning that being said it's a great tool if your search index doesn't have to be your data store of record using elastic search is a great way to add searches to your to your stuff any more questions so mm-hmm yeah yeah so that's a great question do we support batch indexing and the answer is sort of so we're built so Kafka can actually support very very high throughputs and so on large clusters doing sort of like pure batch indexing where you're writing directly to HDFS versus writing to Kafka based on our benchmarks there's not a huge difference so what we're building is a MapReduce job that can read historical data sets so one of the things that our application does is in addition to writing everything to the search index it always writes a full copy of every event to HDFS in what we call sort of our in case of emergency break glass data set and this tool will basically spin up a MapReduce job or possibly a spark job we haven't decided which one to use read the data in parallel write it to Kafka and parallel and basically support high-throughput bulk loads through Kafka into our system yeah we do have that as we have something called data life cycle management with basic leaking in our case it publishes messages into Kafka it says delete data that's older than X so we have a separate process that do that in the background that actually exists we have that for solar as well so and then one other thing related to that so the question was about like pruning of data and also like optimizing of older data sets one thing that we I don't think we mentioned during the presentation but because most of the data is coming in in time order older indexes we automatically sort of close the index writers for those older indexes just as they sort of time out one of the things that we could do in the future but we haven't done is run sort of a final optimization step to fully optimize those old indexes because they're not going to be written ever again which means most likely you know there is obviously delayed data and you may have to like reindex data for some particular reason but for the most part older index is once they're written you're done with them and so that is something that we've taught about for the future of being able to sort of have a background thread that optimizes old indexes so that the on disk format is as optimal as we can get it right thank you very much

Leave a Reply

Your email address will not be published. Required fields are marked *