https://egghead.io/courses/save-time-avoiding-common-mistakes-using-rxjs
There are a couple reasons why this is a bad idea. First of all is that the click stream is now exposed here for any other script in the page to send events into it, so you may get confused how does this click stream work. You need to search for all of the usage of .next throughout the whole codebase. That may get confusing.
The second reason is that the event listener here is not being disposed. It's actually being attached to the document there and always staying there.
We can convert this subject to an observable and eliminate these two problems. We can do that with rx.observable.create. Here, the argument is just what we do when subscribe happens. We just write here the recipe for what should happen when subscribe happens.
For instance, we're literally just writing what happens when we subscribe. In that case, we just want to add an event listener on the [inaudible 1:26] . That event listener will send the event to the observer instead of sending it directly to the subject. We don't have a subject anymore. We just have the observer and the observable. We're going to send that event there, like this.
Obviously, we don't need this part anymore because we already have an event listener, and also the click stream now is an observable, so we don't even have this .next method available. It's only on subjects and observers.
We can remove this part and here when we subscribe to the click stream, we're going to just log out something interesting like the x-coordinate of that click. When we run this, we can click here, we can see those x-coordinates in the console.
One more thing, I also mentioned removing resources like the event listener. Here we added it to the document but we never removed it, so we can also code this logic inside the subscribe function as well.
Here, we can return unsubscribe function and this is the logic that we want to run once the user calls subscription.unsubscribe. Here we do our disposal.
For instance, when we subscribe, we get out a subscription. We can keep it like that so that later on we can clean out that subscription. For instance, if we run this code after four seconds, we can call subscription.unsubscribe. That will run the logic that is inside here.
For instance, we can do document.remove event listener, four clicks. We need to pass the reference to the same function. This function was actually anonymous. As you can see, there was no name to it. We need to have a name to function here.
That's why I'm going to extract this function and I'm going to put it over here. I'm going to call it just listener, is this anonymous function. That's what I'm registering when I add the listener. I'm going to remove that same exact function, like this.
This means that once we run this code, I can click for a while but after four seconds, I won't be able to click anymore as you can see, because the subscription was unsubscribed. This is how you can replace subjects with observables and have a lot of benefits from that.
Also, because it's low-level as well, here we are facing potential problems, for instance when we call observer.next. What if we would have made a typo where we have an extra V here for the name? Then this would throw an error.
But we need to catch the error and send it to the observer as well. Ideally we need to write like this. Try and then catch the error and then send that error as well to the observer. We need to do this because who knows?
Maybe some other operators down the chain will try to catch this error, or replace it, or something like that. As you can see, we need even more code to get it all correct. Instead of using observable.create, RX provides a lot of helpers to create observables of different types.
It happens to be for events on the dom such as clicks, we already have a helper called observable.fromEvent and here we can pass the node or the element and here we're going to pass the document and events of type click.
With this, we get the same thing that we had before and it already has that unsubscription logic to remove event listeners and so forth. It still works as we want it to, and etc. Of course, fromEvent doesn't work for all of the types of observables. Of course, if you want something from WebSockets, for instance, you're not going to use fromEvent. You're going to use something else.
But RX has many of these helpers to create observables and you should try to use them as much as possible instead of resorting to observable.create, because it's very low-level and you may do something wrong or forget some small detail. Usually you can get by creating all of the observables that you need from these helpers.
On the other hand, what's not nice is to keep track of these subscriptions. What if you have a dozen of these different subscriptions, and then you need to handle the calls to unsubscribe for each one of those?
It turns out that there are ways you can replace the manual unsubscribe with something smarter and more automatic. In our case, we want to dispose after four seconds. It turns out that we can represent this event of four seconds as an observable as well.
We can write here four stream as an observable of interval four seconds. That means that this will emit every four seconds, but we're going to take just one of those values.
It will emit after four seconds and then complete. We can make another observable here called clickUntil four stream. This will be basically the click stream, but we're going to use the takeUntil operator, and the argument here is four.
What it does this do is that it basically behaves the same as click stream, except it will complete when a four emits something. Just to make a quick marble diagram of what is going on there, if we have here click stream and four stream here and then clickUntil four stream will be this.
If we have our clicks happening here overtime as such, and then we have this we know that will emit a value after four seconds. Let's imagine that four seconds is somewhere over here, and then it's going to complete after that, because we just take one of those events.
ClickUntil four will be basically behaving like the click stream like this, except it will complete whenever this one emits, like that. The trick here is that whenever an observable completes, it will automatically unsubscribe from its resources. That's what allows us to be able to remove this part essentially. Instead of subscribing to the click stream, we're going to subscribe to clickUntil four.
We can try this out here in the console. I'm going to click for a while. After four seconds, it will not take my clicks anymore, as such. The trick here is really just to make observables complete somehow. TakeUntil is one of those operators that allows you to complete.
It's not the only operator that does that. We also have, for instance, here takeOperator that takes in number. Here as an argument, we're going to pass six there. Now, we can call this six clicks. We don't need this four observable anymore.
The logic here will be a bit different because we're doing something different. Instead of completing the click observable after four seconds, we're completing it after we have seen six clicks, like this. When it completes, it will also dispose the resources such as the event listener.
Let's try this out as well. One, two, three, four, five, six, and it no longer takes my clicks.
The lesson here is just, whenever you see a subscription, like for instance, if we would keep this subscription, and if you think that you need to unsubscribe it, then first ask yourself, "Can you make this one complete instead of using the unsubscribe?" Because then, you're going to have something that is more automatic and you don't need to worry about those un-subscription moments.
Then we can accumulate all of those number 1s as this X with the scan operator, and we can make this sort of count observable. Just to illustrate how the system works, whenever an event like response comes back from the server it registers, or whenever I click it will increment that. It's basically just counting all of the events that happen in my system, whether they are clicks or responses. Now the way that we can avoid the subject here is by identifying our original source of data, just asking ourselves where does the data come from?
We know that it comes from clicks. It's converted to the number 1, and then it's accumulated, so we know that it comes from the clicks. Let's just first write here our steps. We first identify the sources of data, and then we convert those sources of data to observables.
After we have observables for those sources, we can just compose them with all of the operators that we know. For instance, here we need to make an observable for our source data which is clicks, so we can make that with the from event, and click type, so that's our source of data.
We also know that data may come from the server as a response. We also need to convert this into an observable. Let's make that as response observable, and we can make it from a promise by calling from, and passing in that promise there. Now this is one of our sources of data as well. Now that we have these two sources of data, we can compose them in different ways.
We can make a stream here called oneObservable which will represent those number 1s that we were sending. It's basically going to be a merging of click stream and response stream, so we're basically saying that 1 is either events from click or either event from the response observable, and we're going to map each one of those to the number 1. We don't really care what is the contents of the click, or the contents of the response, we just want to convert that to the number 1.
We can get rid of our subject, and we can use here oneStream instead. This is going to work just like before. When we run this, it gets the number 1 to represent the response. As we click, it will register those as well.
If you want to make this slightly simpler, we can also cut out the middle man and just put in here straight merge, and then map. Now this countStream is basically either events from the clickstream or the response stream, and we're just mapping each one of those to the number 1, and then we're accumulating them over time.
The lesson here is try practicing by identifying where does your real data come from, and then asking yourself, "Can I wrap that in an observable?" When those things become observables like here, your code base will be more predictable because you can know where is the data coming from, and where it is flowing without having to hunt down all of the calls to subject.next method
The way that we're achieving that is by listening to clickstream. We can get those vents of clicks. We feed the client X property into the subject with .next. Now, we have a situation here which data's coming from the clickstream and it's going to the X subject.
Whenever you have the situation of source of data and then destination, that's exactly the used case for an operator -- getting data from source and sending it to some destination.
In this case, instead of having this subject, we can make an observable of these X coordinates by using operator on the clickstream. That operator in this case is mapping.
Usually, when you have a next inside a next, it usually means a map, although there are different cases where you might need a slightly different operator. In this case, we get the click event and we just map that to the property clientX.
Now, I don't even need the subscribe anymore, because I don't need to get data from clicks. The only purpose of that was to send it into the X subject, but I don't need that anymore.
We can simply subscribe to the stream of coordinates. Then we can put each of these Xs in the console log. This will work as we want it to. Each time we click, it shows the X coordinate of that event. We don't need the subject here.
The tip here is if you see a next inside a next, then something is wrong. Remember that we're avoiding subject here, because it's less save by default where observables are often able to automatically dispose resources.
That's not good, because one of the main purposes of RxJS is to solve callback hell. In this example, we have a click observable and we have a response observable. The idea is that we subscribe so that whenever a click happens, we subscribe to the user data in order to make it execute and fetch that data.
Finally, when we get that data, we just put it in console. Just to demo this, I click and it will fetch from the server this data.
Notice that this is all about putting asynchronous actions in sequence, because clicks happen asynchronously and also does the user data come asynchronously. We are putting here an order that first, we want the click to happen and then we want the response.
How can we convert this to something that looks more like RxJS? First, I'm going to note that user data here is from the closure. It's basically referring to this cost that we defined up there. We can do this in different manner by first defining another observable called response when click. This will be the click stream. We're just going to map each of the click events to user data stream.
Now, I'm using this from the closure there, and then instead of subscribing to the click, I'm going to subscribe to response when click. I know that I'm not receiving anymore these events like I did before, but now I'm receiving this user data stream, which I'm going to call as response stream.
Response stream is basically this here. That's why actually this name we should rename this to stream stream, because it's basically an observable that emits observables. This is an observable. Basically, this has been always an observable.
This is not so nice, because it's an observable of observables and actually there are methods in RxJS to flatten these things. One of them is called merge all. Merge all is one of these flattening operators. In case you have an observables of observables, in our case for instance, we have clicks which were mapped to also smaller observables like these, I have the response.
Now, it's branching out. With merge all, you're able to flatten that in order to get just a normal response observable. When we do merge all, we go back to just having a normal observable, which means that this observable emits no data, it doesn't emit observables. This is now that data and we can simplify this code like this.
Let's see if that still works. Once I click here, it will fetch some data. If click again, it fetches that data again. If you want to simplify this even more, we could use instead the operator called merge map which does simply map and merge all at once. It's like a shortcut.
That is basically how we were able to remove that callback hell. It's basically, whenever you have a subscribe inside another subscribe that can be comforted to an observable of observables which in turn can be flattened with operators like merge map or merge all and others.
Let's demonstrate that problem with this example. I have a click observable, then, I have another observable which depends on the clicks, and whenever a click happens, we will spawn a new inner observable which ticks every half a second. Then those numbers are put here in the console.
Let's see that working. Once I click here, it will spawn new inner observable, and if I click again, it will generate a new inner observable, and now, there are two of them happening concurrently. The more I click, the more it's going to spawn these new inner observables, and this just keeps on going forever.
Since these are infinite observables, these are never going to stop, so the more I click, it means that the more CPU and RAM we are consuming, and this doesn't stop. It doesn't have this cancelation logic into it.
Flatmap doesn't provide us that. But there are other operators, one of them is called switchMap, which provides you cancelation built in. It works in a similar fashion than flatMap, except it has slightly different semantics.
Here if we click, it will spawn a new inner observable, but once we click a second time, it cancelled, or basically unsubscribed from the previous one, and is now only subscribing to the most recent one. If I click again, you can see it cancelled the second one, and only took the third one.
That means that at any given time we only have inner observable ticking. We don't consume more and more RAM and CPU as we go. That means that switchMap is usually a better default choice, because it has this cancellation built in. Now that works even for the example with requests, here, we had our system with mergeMap to get the requests from the server.
Once we click here, it will give us that data, but we could have used switchMap here and it would have worked in basically the same way.
As you can see, once I click here, it will give me that data. The difference here is that if I click two times very quickly, basically a double click, then it wouldn't perform two requests. It would cancel the request for the first click, like this. As you can see, I just got one data back.
The conclusion is that when people use mergeMap, basically what they want to do is just flatten an observable of observables, but you need to realize that mergeMap is not the only flattening strategy, and if you don't know which strategy to choose you're probably better off with switchMap. By default, use switchMap, if you really know what you're doing, then go ahead and use mergeMap.
Often, we need a similar functionality to promise.all, but in our Ext JS. For instance, imagine that we're calculating the volume of a room from its components like length, width, and height.
We have these observables to represent those, and then maybe you have used zip for this purpose. Now zip will actually work in this case, it takes the member observables like these three observables and then it also takes a function which tells how do we combine those three values into one single value.
In this case, we'll actually produce the correct volume there, but with a detail that if we would have updated the width and only the width to be eight meters, for instance, then we wouldn't see 5x7x2.8 and also then 5x8x2.8, we only saw 5x7x2.8.
It didn't actually consider this value, and that's because of the way that zip works, it's basically waiting for the second value from all of these other observables, so we need to add also here six, and still it's not enough because it's waiting for the second value from this observable, so we also need to add that value there.
Now, if we run this, it will calculate the volume for the first of these values, and also for the second of these values.
As you see, zip works in this synchronized manner because it's going to calculate for the first of these, and second of these, and third of these, and it's not going to mix the first with the second, and that type of stuff. Actually, this is probably not what you wanted. Zip is very specific, but a better operator to use by default is combineLatest, which has a similar signature to zip.
We don't need to change anything here, and then combineLatest allows us to update only one of these values and then as you see, it will calculate 5x7x2.8 which gives us 98, and also 5x7x2.5 which gives us 87.5.
That's why it's called combineLatest, because it only combines the latest values from each of these member observables. Basically, only the most recently emitted values from each of these members.
The lesson here is when you're combining many observables together to produce one value, then use combineLatest by default. Only use zip when you know that the observables being combined are guaranteed to have synchronized emissions which is quite rare, to be frank.
How does do work? It's actually similar to a map operator where we take the input event and then we can simply return that same input event. Basically this done nothing. Every event that comes in will be the same that comes out, so it really does nothing. We can also include a side effect here.
Do does the same thing as this map. As you can see that allows us to add the sneaky side effect. Do is just a shortcut so that we don't need to write these lines of code. Basically do causes a sneaky side effect to happen during the data flow of this chain. This sneakiness may become a problem.
For instance, what if this part would be named as c stream for clicks and then in another file we use that c stream like this. We think that c stream is just an observable of click events, when in fact every time that we observe it, it causes this side effect. There is some sneakiness going on there.
In another sense, when we read this code it seems like data comes from this source, passes through these operators and ends in this destination. It seems like there is just one destination, but in fact there are two destinations of data where data comes from here but it also lands here as a destination in order to update the dot position.
As you can see these things are not so obvious. In fact we can refactor this code to avoid a do and we can use another subscribe. We're going to start by first noticing that this isn't anonymous observable. I didn't assign it to some const.
Also, when we call an operator this will be another observable which is anonymous. Let's give some names for these. Let's call this click stream. Then now we can subscribe to that click stream.
Notice that I didn't even change the contents here because it's basically the same for do and for subscribe. Now, we can create a response stream that will be this click stream switch mapped to this Ajax. Then we can subscribe to the response stream in order to get that data.
Notice how with this exercise where we named each of these sources of data now things are bit more explicit where we have this source of data and we subscribe to it to get that data and put it in this destination.
Then we also this source of data which depends on that source of data, and we subscribe to that one to give data to that destination. Notice also that if I replace subscribe with do this won't actually perform the side effect, because remember do is almost like a map, and as you know map will return an observable.
This will return an observable, but we're not subscribing to that one, we're just dropping it on the floor. That's why this will never happen if we have the do here. That's why we need to have a subscribe.
As you can see we got rid of our do and our code will still work in the same way. We will update the dot position and we're also going to make requests to the server. That said, there are good use cases for do, and I would recommend that you use do for debugging. We can put here whenever we get an event we can console log that event's client ex-position.
Now, when we click we also log out the ex-position here. We also do the side effects. Use do for debugging, it's very convenient, because the sneakiness of a console log is not a problem since it's just for debugging.
The lesson is, make the pipeline of sources of data and destinations of data more explicit by using subscribe more often than using do.
Maybe the data behind this endpoint is going to be different someday, that's why we have polling.
Right now, we're doing a request every two seconds. This is going to last forever. That's where pausing would be nice. While a pause and resume is possible in our Ext JS, there are some downsides to be aware of.
Ideally, we want to change the behavior of this observable, but that's not possible because this observable was defined to be like this, every two seconds, it emits an increasing number.
After it was defined, there is nothing that we can do to change that behavior. We could maybe apply operators but that's going to create new observables based on this observable. We can't actually change how this one behaves.
That said, we can apply a trick and that trick involves inserting a new observable which we'll call resume stream and this will emit false or true, so Booleans. We're going to make that as a subject. The subject here is not the important part. You could have done this with observables as well. It's only important that this will emit Booleans. False will mean to pause and true will mean to resume.
We're going to start by making it send false so that it's paused. We're going to set a timeout so that after a while, we are going to send true so that it starts, let's say, after 500 milliseconds. We're going to do that. Again, we're going to send false after, let's say, five seconds.
How do we use resume here? In this part of the code, we're going to write resume here, and then we're going to map, except we're going to use a switch map. We're going to take that Boolean, and we're going to check, is that Boolean true?
If it is, we're going to return this in our observable. If it is false, we're going to return our rx-observable.empty. I'm also going to put here at the end, do just for a console log for debugging purposes.
What does the switch map do here is that when we see a true from resume stream, then this stream will behave like this stream. When we see a false, then this stream will behave like the empty stream.
The empty stream won't emit any value, so that's why this part won't happen during the execution of this empty. While this one is executing, then we will get those events and that's how the Ajax is going to happen.
Let's try running this. Once we start, it starts pause. After 500 seconds, it does a request, and then it does another request. It stopped because five seconds kicked in and we have paused the resume. As you see here, we have requested zero. This zero came from the observable interval. After a while, we had requested one.
Now, I'm going to show something interesting where I'm going to resume again, let's say, at six seconds. We're going to see what happens. I'm going to pause at, let's say, nine seconds. We start paused and then after a while, it starts in two requests, and then we pause and then we start again. We pause at nine seconds.
Notice, we had requested zero, requested one, and then requested zero. This means that this stream here, the interval, although it behaves like this, it does zero, one and two, etc. We didn't resume and start getting two and three. When we resumed at this point here, we got zero.
That means that, as I told you before, we cannot pause and resume this stream, so that's why we didn't get the number two but we just restarted this stream. That's what happened. Once we subscribe again to this inner observable, we're going to start getting the number zero.
As you can see, we don't really truly have pause and resume for this observable, but we're just implementing the feature of pause and resume in a way that makes sense for us.
This is due to limitations with our Ext JS related to how observables fundamentally work, because after you call subscribe on an observable, you're not anymore in control of how data arrives. Observables are a one way communication of data from that observable to the observer.
We can't pause because that would mean that the observer wouldn't be able to send events back to the observable, and that would be a two way communication. That's not really the use case for observables. They are one way communication.
There are other features like EA6 generators that allow you to have two-way communication. Observables have different use cases.
The important thing is that we were able to implement this pause feature through different ways. It's not a problem that we cannot pause explicitly the specific observable. The lesson is, don't spend time trying to force observables to truly pause and resume.
This is what happens here when we subscribe. We're going to see that my age is 20, like that. My custom class here has also this setter, so I can set the age, but, as you noticed when I set it 21, nothing changes here in the behavior of the observable. This is starting to look a bit complicated.
What would we need to do in order for that age to actually appear is we would need to, for instance, set the observer of the class to this observer once we subscribe. Now we have a reference to the observer, and then we could do something like this observer should receive the next age that was just set, so maybe this works now.
Also, this is not ideal because what if we do the set age before the subscribe? Then, technically, here observer is not defined yet because it's only defined once we subscribe, so this is going to break. As we see, it broke.
It started to get complicated, and the reason is that you shouldn't usually extend the observable to make your own custom observables. There are reasons why this is bad idea. First of all, it is complicated. It's hard to get this right so that we don't get errors, such as this case.
You should avoid doing this for the same reasons that you should using Observable.create, because there you may violate the observable contract, like, for instance, what if you have an error here, and then after that try to send through observer.next()? That violates the observable contract.
Another reason is that usually observables don't carry state between different subscribers, but here we're definitely carrying this common state of the current observer throughout different subscribers. If we would subscribe more times here, we would have conflicts. We would have one subscribe trying to set the observer and the other subscribe setting it again, so there's this conflict.
As you can see, it's not the case here. We shouldn't be trying to do this. To build special observables, we just compose them with the operators that we know, not by extending the observable, because we'd get into very tricky situations.
That said, there are legitimate cases for extending the observable. One of them is, for instance, logging. Here we have an example of that. To give some context, here this observable ticks every 100 milliseconds, and it looks like this over time. Once we passed that through this chain of operators, we're multiplying each of those by 10, so we get 10, 20, 30.
We filter for only numbers bigger than 15, so we get 20 and 30. Then we count the amount of those, so we're going to get two at the end. That's why it's going to show a pop-up saying two, like that. Here we can make our own custom observable by extending that. Then we can use the lift method, which is very interesting.
We also need to extend a little bit more stuff, like we need to make a log subscriber and log operator, but the idea here is that we want to inject a special behavior throughout the whole chain. Here we want to inject console.log() whenever next happens, or also when an error happens, or when complete happens.
I recreated this observable here but now as a log observable. It's an observable, so it takes the subscribe function here. We can call observer.next() every 100 milliseconds and observer.complete(). This observable is a log observable. The interesting thing is that once you call map on a log observable, the result of this will be also a log observable.
Also, once you call filter on a log observable, you're going to get out a log observable and so forth. How did we get that is through lift. Lift allows us to inject a custom behavior throughout the chain of operators. Basically, we're creating a new observable, which is also a log observable.
That's how we get those new ones, and that's what we return here in the end, but we're telling this one here that its source is the current one, so this keyword. We're wrapping the operator, such as map, in this log operator. That's how the log operator is going to create a log of subscriber and then finally inject that custom behavior in the chain.
What we get is that, once I hit run here, it's going to log each of these operations in the console log like that. We saw here 10 as the output of map. 10 didn't pass the filter because it's less than 15. Then we saw 20 as the output of map. 20 also passes the filter, so that's why we see 20 again. Then we see 30 from the map. 30 passes the filter. We see 30 again.
Then the output of map completes. That also propagates the filter, so filter completes. Now that finally this one has completed, count will kick in, so it's going to count all of the events that happened. It saw just two, so that's why count now emits two. Then finally count completes.
It's very nice that we were able to inject this logging behavior into this chain. Notice that we didn't touch any code related to this part. We only changed what was the source here, what was this observable. If we go back to this, as you see, now we don't have logging anymore.
All that was needed was just to change this source, because this log observable will propagate down this chain. That's one good use case when you should extend the observable -- when you want to inject a custom behavior throughout all of the chain of operators.
That said, it's not that common to extend the observable for this purpose, and it's also rather tricky because you need to make your own log operator that wraps other operators and also a log subscriber. You need to know some bits of the internal logic in RxJS.
Overall, I recommend that you don't extend the observable class but just stick to operators that allow you to compose and easily build observables.
The simplest example of this is when you have a clock observable that ticks every second, and we have six of those events. Now we have here two subscribers, a and b. Once we run that, each of those are going to get a different execution. If this is familiar to you, this is the usual problem.
We solve that by sharing that execution. Here, we can add .share() at the end, and now we're going to have a common execution of this clock observable shared to these two different subscribers. Now they're going to see the same events at the same time, like that. It's easy to stick to .share() without knowing what it does. I've seen cases where everything is .shared() where it doesn't need to be.
In order to avoid sharing everything, it's easy to remember that subscribe means invoke the execution of this collection so we can observe it. Without .share(), we're saying here, "Please, I want the values that come from this collection and deliver them here." Of course, if we say subscribe twice, we're invoking two different executions of that.
If you keep this in mind, it makes it much easier, because when we add .share() here, we're saying, "Instead of invoking a new execution, please share the execution of the observable if it exists."
Let's look at a bit more difficult example where it may be tempting to .share() everything. Here we also have a clock observable, and we have another observable called random numbers. We're mapping each of those to a random number. Then we have two other observables -- one of them gets random numbers smaller than 50, and the other gets random numbers larger than 50.
ToArray() simply collects all of them in an array so that we see all of them together at the end when it completes. Let's run this and see what happens. The clock is generating events over time. Then we're mapping each of those to a random number. When that completes, we collect all of those numbers into small and large collections.
Notice we have some problems here. We have four numbers in total in small and large category, but we had originally six random numbers, as you can see here. Also, these numbers don't actually occur in this original collection, and neither does 57.84 happen here. This is very strange. You may find this type of behavior in an RxJS app, so it's usually the case that we're not sharing executions here.
What's going on here? First of all, when we do subscribe, remember we're invoking, "Please, give me an execution of this collection." When we do this, we're going to look here, "Give me an execution of this collection. Give me an execution of that collection. Give me an execution of this random number," which will finally ask for an execution of the clock.
This is going to have its own execution. Each of these maps will be totally exclusive for large number subscriber and the same thing for small number and random number. We essentially have here three executions of the clock and three executions of the random number. That's why we have all of these different random numbers.
It's easy to instead put .share() here everywhere. Sometimes people do this like that in an attempt to ignore where the source is and try to share everything. To some extent, this may solve the problem. As we see here, all of these numbers correspond there, and they're the same, but this is an overkill, and we don't need to do this. We can just share what is necessary.
Let's look at what is it that we need to share exactly. It all has to do with what kind of side effects happen once we subscribe or once something happens in this operator. For instance, this operator will always give out the same output for the same input. For instance, if we give x is 20 here, it's always going to say true.
On the other hand, when we invoke this function multiple times, we get different numbers every time. This is a side effect. We probably want to share this part, for instance. Also, once we subscribe to an interval, you know that internally it will do setInterval(). That's also like a side effect because it's creating a pseudo-thread. It's creating that thing, so it's not so pure.
We could probably already conclude that this part needs to be shared, because once we call setInterval(), we want that to be shared. Then take(6) will always do the same thing, so we don't need to share that. That's not enough, because let's see what happens.
We get the numbers 61, 8, 3, 85, but those numbers are not corresponding here. The reason for that is that once we call a small number subscribe(), we're saying, "Please give me the execution of this." Then it's going to invoke the execution of this, and then finally invoke the execution of this, and invoke the execution of this map based on this clock.
Even though we're going to share the same setInterval(), but we're not sharing this map, so each of these subscribers is going to get a different execution of this map operation. That's why we also need to share this part. Then, once we do that, we see 81, 30, 60, 76, and those numbers appear here.
It's a good idea not to call share() everywhere because every time you call share() you're actually creating a new subject, and that might cause maybe problems with garbage collection. It's generally wise not to do operations that we don't need to do in the first place.
The tip here in order to avoid cold and hot problems is to remember that subscribe means invoke an execution of this collection, and then all of these executions are changed throughout these operators. Some of those executions we want to share, such as, "This random number, I wanna share it," or, "The setInterval() that's inside this creation, I wanna share that as well."
The other bits are going to behave exactly the same way no matter if you share them or not. That's why we don't do it. Over time, you get to develop a sense of what should be shared and what should not be shared, depending on these side effects that may occur.
Convert RxJS Subjects to Observables
The use of RxJS Subjects is common, but not without problems. In this lesson we will see how they can be usually safely replaced with plain Observables.
Sometimes you may write code in RxJS that uses subjects. While subjects have a legitimate use cases, most of the times, they are actually unnecessary. In this case, for instance, we have a subject called click stream. Whenever a click event happens in this event listener, we will send that event into the subject using the .next method.There are a couple reasons why this is a bad idea. First of all is that the click stream is now exposed here for any other script in the page to send events into it, so you may get confused how does this click stream work. You need to search for all of the usage of .next throughout the whole codebase. That may get confusing.
The second reason is that the event listener here is not being disposed. It's actually being attached to the document there and always staying there.
We can convert this subject to an observable and eliminate these two problems. We can do that with rx.observable.create. Here, the argument is just what we do when subscribe happens. We just write here the recipe for what should happen when subscribe happens.
For instance, we're literally just writing what happens when we subscribe. In that case, we just want to add an event listener on the [inaudible 1:26] . That event listener will send the event to the observer instead of sending it directly to the subject. We don't have a subject anymore. We just have the observer and the observable. We're going to send that event there, like this.
Obviously, we don't need this part anymore because we already have an event listener, and also the click stream now is an observable, so we don't even have this .next method available. It's only on subjects and observers.
We can remove this part and here when we subscribe to the click stream, we're going to just log out something interesting like the x-coordinate of that click. When we run this, we can click here, we can see those x-coordinates in the console.
One more thing, I also mentioned removing resources like the event listener. Here we added it to the document but we never removed it, so we can also code this logic inside the subscribe function as well.
Here, we can return unsubscribe function and this is the logic that we want to run once the user calls subscription.unsubscribe. Here we do our disposal.
For instance, when we subscribe, we get out a subscription. We can keep it like that so that later on we can clean out that subscription. For instance, if we run this code after four seconds, we can call subscription.unsubscribe. That will run the logic that is inside here.
For instance, we can do document.remove event listener, four clicks. We need to pass the reference to the same function. This function was actually anonymous. As you can see, there was no name to it. We need to have a name to function here.
That's why I'm going to extract this function and I'm going to put it over here. I'm going to call it just listener, is this anonymous function. That's what I'm registering when I add the listener. I'm going to remove that same exact function, like this.
This means that once we run this code, I can click for a while but after four seconds, I won't be able to click anymore as you can see, because the subscription was unsubscribed. This is how you can replace subjects with observables and have a lot of benefits from that.
Replace Observable.create with Observable creation helpers
In this lesson we will learn about potential problems that may arise when using Observable.create, a low-level function for creating Observables. In its place, we will use easier helper functions that create Observables in a safe manner.
With observable.create we can control what happens when we subscribe to the observable and what happens when we unsubscribe. That said, you usually don't need observable.create. Because it's very low-level, we took a lot of lines of code here just to get an observable of clicks.Also, because it's low-level as well, here we are facing potential problems, for instance when we call observer.next. What if we would have made a typo where we have an extra V here for the name? Then this would throw an error.
But we need to catch the error and send it to the observer as well. Ideally we need to write like this. Try and then catch the error and then send that error as well to the observer. We need to do this because who knows?
Maybe some other operators down the chain will try to catch this error, or replace it, or something like that. As you can see, we need even more code to get it all correct. Instead of using observable.create, RX provides a lot of helpers to create observables of different types.
It happens to be for events on the dom such as clicks, we already have a helper called observable.fromEvent and here we can pass the node or the element and here we're going to pass the document and events of type click.
With this, we get the same thing that we had before and it already has that unsubscription logic to remove event listeners and so forth. It still works as we want it to, and etc. Of course, fromEvent doesn't work for all of the types of observables. Of course, if you want something from WebSockets, for instance, you're not going to use fromEvent. You're going to use something else.
But RX has many of these helpers to create observables and you should try to use them as much as possible instead of resorting to observable.create, because it's very low-level and you may do something wrong or forget some small detail. Usually you can get by creating all of the observables that you need from these helpers.
Use takeUntil instead of manually unsubscribing from Observables
Manually unsubscribing from subscriptions is safe, but tedious and error-prone. This lesson will teach us about the takeUntil operator and its utility to make unsubscribing automatic.
It's nice in RxJS that once you call unsubscribe, those resources such as event listeners attached will be disposed.On the other hand, what's not nice is to keep track of these subscriptions. What if you have a dozen of these different subscriptions, and then you need to handle the calls to unsubscribe for each one of those?
It turns out that there are ways you can replace the manual unsubscribe with something smarter and more automatic. In our case, we want to dispose after four seconds. It turns out that we can represent this event of four seconds as an observable as well.
We can write here four stream as an observable of interval four seconds. That means that this will emit every four seconds, but we're going to take just one of those values.
It will emit after four seconds and then complete. We can make another observable here called clickUntil four stream. This will be basically the click stream, but we're going to use the takeUntil operator, and the argument here is four.
What it does this do is that it basically behaves the same as click stream, except it will complete when a four emits something. Just to make a quick marble diagram of what is going on there, if we have here click stream and four stream here and then clickUntil four stream will be this.
If we have our clicks happening here overtime as such, and then we have this we know that will emit a value after four seconds. Let's imagine that four seconds is somewhere over here, and then it's going to complete after that, because we just take one of those events.
ClickUntil four will be basically behaving like the click stream like this, except it will complete whenever this one emits, like that. The trick here is that whenever an observable completes, it will automatically unsubscribe from its resources. That's what allows us to be able to remove this part essentially. Instead of subscribing to the click stream, we're going to subscribe to clickUntil four.
We can try this out here in the console. I'm going to click for a while. After four seconds, it will not take my clicks anymore, as such. The trick here is really just to make observables complete somehow. TakeUntil is one of those operators that allows you to complete.
It's not the only operator that does that. We also have, for instance, here takeOperator that takes in number. Here as an argument, we're going to pass six there. Now, we can call this six clicks. We don't need this four observable anymore.
The logic here will be a bit different because we're doing something different. Instead of completing the click observable after four seconds, we're completing it after we have seen six clicks, like this. When it completes, it will also dispose the resources such as the event listener.
Let's try this out as well. One, two, three, four, five, six, and it no longer takes my clicks.
The lesson here is just, whenever you see a subscription, like for instance, if we would keep this subscription, and if you think that you need to unsubscribe it, then first ask yourself, "Can you make this one complete instead of using the unsubscribe?" Because then, you're going to have something that is more automatic and you don't need to worry about those un-subscription moments.
Convert an underlying source of data into an Observable
While there are many cases where we often believe that an RxJS Subject is necessary, there is a way of avoiding them. In this lesson we will see how to identify the underlying source of data and convert it into an Observable, essentially eliminating the use of an error-prone Subject.
There will be times where you think you need an RX subject, and still it is possible to avoid them. In this case we have a subject, just a generic one without a proper name, and we're implementing a simplified analytic system. Basically, whenever a click happens, we're sending in the event of number 1 into that subject, or whenever a request comes back from the server as response, we're also sending in a number 1 to that subject.Then we can accumulate all of those number 1s as this X with the scan operator, and we can make this sort of count observable. Just to illustrate how the system works, whenever an event like response comes back from the server it registers, or whenever I click it will increment that. It's basically just counting all of the events that happen in my system, whether they are clicks or responses. Now the way that we can avoid the subject here is by identifying our original source of data, just asking ourselves where does the data come from?
We know that it comes from clicks. It's converted to the number 1, and then it's accumulated, so we know that it comes from the clicks. Let's just first write here our steps. We first identify the sources of data, and then we convert those sources of data to observables.
After we have observables for those sources, we can just compose them with all of the operators that we know. For instance, here we need to make an observable for our source data which is clicks, so we can make that with the from event, and click type, so that's our source of data.
We also know that data may come from the server as a response. We also need to convert this into an observable. Let's make that as response observable, and we can make it from a promise by calling from, and passing in that promise there. Now this is one of our sources of data as well. Now that we have these two sources of data, we can compose them in different ways.
We can make a stream here called oneObservable which will represent those number 1s that we were sending. It's basically going to be a merging of click stream and response stream, so we're basically saying that 1 is either events from click or either event from the response observable, and we're going to map each one of those to the number 1. We don't really care what is the contents of the click, or the contents of the response, we just want to convert that to the number 1.
We can get rid of our subject, and we can use here oneStream instead. This is going to work just like before. When we run this, it gets the number 1 to represent the response. As we click, it will register those as well.
If you want to make this slightly simpler, we can also cut out the middle man and just put in here straight merge, and then map. Now this countStream is basically either events from the clickstream or the response stream, and we're just mapping each one of those to the number 1, and then we're accumulating them over time.
The lesson here is try practicing by identifying where does your real data come from, and then asking yourself, "Can I wrap that in an observable?" When those things become observables like here, your code base will be more predictable because you can know where is the data coming from, and where it is flowing without having to hunt down all of the calls to subject.next method
Use the map operator instead of firing events on a Subject
In this lesson we will learn how to replace excessive subscribing and Subject emissions with simple uses of the map operator, simplifying our code and making it less bug-prone.
Whenever you see a next method inside a next callback when subscribing, then something is wrong. I can't imagine a case where you need this. What's going on here is that we have subject for X coordinates. Whenever a click happens, we want this subject to emit the X coordinate of that click event.The way that we're achieving that is by listening to clickstream. We can get those vents of clicks. We feed the client X property into the subject with .next. Now, we have a situation here which data's coming from the clickstream and it's going to the X subject.
Whenever you have the situation of source of data and then destination, that's exactly the used case for an operator -- getting data from source and sending it to some destination.
In this case, instead of having this subject, we can make an observable of these X coordinates by using operator on the clickstream. That operator in this case is mapping.
Usually, when you have a next inside a next, it usually means a map, although there are different cases where you might need a slightly different operator. In this case, we get the click event and we just map that to the property clientX.
Now, I don't even need the subscribe anymore, because I don't need to get data from clicks. The only purpose of that was to send it into the X subject, but I don't need that anymore.
We can simply subscribe to the stream of coordinates. Then we can put each of these Xs in the console log. This will work as we want it to. Each time we click, it shows the X coordinate of that event. We don't need the subject here.
The tip here is if you see a next inside a next, then something is wrong. Remember that we're avoiding subject here, because it's less save by default where observables are often able to automatically dispose resources.
Use flattening operators instead of nested subscriptions
We are going to see how to avoid virtually any case of a subscribe happening inside another subscribe by replacing it with a flattening operator such as flatMap or mergeAll.
When you see a subscribe inside another subscribe call, then something is wrong as well. The reason why I say wrong is because this is starting to look like call back hell. If you remember call back hell, it happens whenever you have a callback inside another callback inside another callback, and you usually get something pyramid shaped like this code.That's not good, because one of the main purposes of RxJS is to solve callback hell. In this example, we have a click observable and we have a response observable. The idea is that we subscribe so that whenever a click happens, we subscribe to the user data in order to make it execute and fetch that data.
Finally, when we get that data, we just put it in console. Just to demo this, I click and it will fetch from the server this data.
Notice that this is all about putting asynchronous actions in sequence, because clicks happen asynchronously and also does the user data come asynchronously. We are putting here an order that first, we want the click to happen and then we want the response.
How can we convert this to something that looks more like RxJS? First, I'm going to note that user data here is from the closure. It's basically referring to this cost that we defined up there. We can do this in different manner by first defining another observable called response when click. This will be the click stream. We're just going to map each of the click events to user data stream.
Now, I'm using this from the closure there, and then instead of subscribing to the click, I'm going to subscribe to response when click. I know that I'm not receiving anymore these events like I did before, but now I'm receiving this user data stream, which I'm going to call as response stream.
Response stream is basically this here. That's why actually this name we should rename this to stream stream, because it's basically an observable that emits observables. This is an observable. Basically, this has been always an observable.
This is not so nice, because it's an observable of observables and actually there are methods in RxJS to flatten these things. One of them is called merge all. Merge all is one of these flattening operators. In case you have an observables of observables, in our case for instance, we have clicks which were mapped to also smaller observables like these, I have the response.
Now, it's branching out. With merge all, you're able to flatten that in order to get just a normal response observable. When we do merge all, we go back to just having a normal observable, which means that this observable emits no data, it doesn't emit observables. This is now that data and we can simplify this code like this.
Let's see if that still works. Once I click here, it will fetch some data. If click again, it fetches that data again. If you want to simplify this even more, we could use instead the operator called merge map which does simply map and merge all at once. It's like a shortcut.
That is basically how we were able to remove that callback hell. It's basically, whenever you have a subscribe inside another subscribe that can be comforted to an observable of observables which in turn can be flattened with operators like merge map or merge all and others.
Use switchMap to avoid leaks when flattening
While flatMap is popular and convenient for flattening higher-order Observables, it can introduce new kinds of bugs related to subscriptions. In this lesson we will see how switchMap is a sensible default that avoids common bugs.
The merge map operator, also known as flatMap, either name works, is quite impotent and common for solving some asynchronous issues. That said, it can create some other problems if you don't use it in the correct cases.Let's demonstrate that problem with this example. I have a click observable, then, I have another observable which depends on the clicks, and whenever a click happens, we will spawn a new inner observable which ticks every half a second. Then those numbers are put here in the console.
Let's see that working. Once I click here, it will spawn new inner observable, and if I click again, it will generate a new inner observable, and now, there are two of them happening concurrently. The more I click, the more it's going to spawn these new inner observables, and this just keeps on going forever.
Since these are infinite observables, these are never going to stop, so the more I click, it means that the more CPU and RAM we are consuming, and this doesn't stop. It doesn't have this cancelation logic into it.
Flatmap doesn't provide us that. But there are other operators, one of them is called switchMap, which provides you cancelation built in. It works in a similar fashion than flatMap, except it has slightly different semantics.
Here if we click, it will spawn a new inner observable, but once we click a second time, it cancelled, or basically unsubscribed from the previous one, and is now only subscribing to the most recent one. If I click again, you can see it cancelled the second one, and only took the third one.
That means that at any given time we only have inner observable ticking. We don't consume more and more RAM and CPU as we go. That means that switchMap is usually a better default choice, because it has this cancellation built in. Now that works even for the example with requests, here, we had our system with mergeMap to get the requests from the server.
Once we click here, it will give us that data, but we could have used switchMap here and it would have worked in basically the same way.
As you can see, once I click here, it will give me that data. The difference here is that if I click two times very quickly, basically a double click, then it wouldn't perform two requests. It would cancel the request for the first click, like this. As you can see, I just got one data back.
The conclusion is that when people use mergeMap, basically what they want to do is just flatten an observable of observables, but you need to realize that mergeMap is not the only flattening strategy, and if you don't know which strategy to choose you're probably better off with switchMap. By default, use switchMap, if you really know what you're doing, then go ahead and use mergeMap.
Replace zip with combineLatest when combining sources of data
This lesson will highlight the true purpose of the zip operator, and how uncommon its use cases are. In its place, we will learn how to use the combineLatest operator.
Sometimes in our Ext JS, we need to wait until many observables have emitted a value, to then combine those values together. This is a common use case with promises. Actually, there is a method called promise.all, which I hope is familiar to you.Often, we need a similar functionality to promise.all, but in our Ext JS. For instance, imagine that we're calculating the volume of a room from its components like length, width, and height.
We have these observables to represent those, and then maybe you have used zip for this purpose. Now zip will actually work in this case, it takes the member observables like these three observables and then it also takes a function which tells how do we combine those three values into one single value.
In this case, we'll actually produce the correct volume there, but with a detail that if we would have updated the width and only the width to be eight meters, for instance, then we wouldn't see 5x7x2.8 and also then 5x8x2.8, we only saw 5x7x2.8.
It didn't actually consider this value, and that's because of the way that zip works, it's basically waiting for the second value from all of these other observables, so we need to add also here six, and still it's not enough because it's waiting for the second value from this observable, so we also need to add that value there.
Now, if we run this, it will calculate the volume for the first of these values, and also for the second of these values.
As you see, zip works in this synchronized manner because it's going to calculate for the first of these, and second of these, and third of these, and it's not going to mix the first with the second, and that type of stuff. Actually, this is probably not what you wanted. Zip is very specific, but a better operator to use by default is combineLatest, which has a similar signature to zip.
We don't need to change anything here, and then combineLatest allows us to update only one of these values and then as you see, it will calculate 5x7x2.8 which gives us 98, and also 5x7x2.5 which gives us 87.5.
That's why it's called combineLatest, because it only combines the latest values from each of these member observables. Basically, only the most recently emitted values from each of these members.
The lesson here is when you're combining many observables together to produce one value, then use combineLatest by default. Only use zip when you know that the observables being combined are guaranteed to have synchronized emissions which is quite rare, to be frank.
Move important side effects from do() to subscribe()
The do() operator allows us to perform side effects in the chain of operators. However, there are limited use cases for do(). We will learn what those use cases are and how to use subscribe() most of the times.
If you haven't heard of the do operator it allows us to perform a side effect in a specific point here in the chain of operators. In this example a click here will update the dot position but it will also cause request to happen to the server and get the response back.How does do work? It's actually similar to a map operator where we take the input event and then we can simply return that same input event. Basically this done nothing. Every event that comes in will be the same that comes out, so it really does nothing. We can also include a side effect here.
Do does the same thing as this map. As you can see that allows us to add the sneaky side effect. Do is just a shortcut so that we don't need to write these lines of code. Basically do causes a sneaky side effect to happen during the data flow of this chain. This sneakiness may become a problem.
For instance, what if this part would be named as c stream for clicks and then in another file we use that c stream like this. We think that c stream is just an observable of click events, when in fact every time that we observe it, it causes this side effect. There is some sneakiness going on there.
In another sense, when we read this code it seems like data comes from this source, passes through these operators and ends in this destination. It seems like there is just one destination, but in fact there are two destinations of data where data comes from here but it also lands here as a destination in order to update the dot position.
As you can see these things are not so obvious. In fact we can refactor this code to avoid a do and we can use another subscribe. We're going to start by first noticing that this isn't anonymous observable. I didn't assign it to some const.
Also, when we call an operator this will be another observable which is anonymous. Let's give some names for these. Let's call this click stream. Then now we can subscribe to that click stream.
Notice that I didn't even change the contents here because it's basically the same for do and for subscribe. Now, we can create a response stream that will be this click stream switch mapped to this Ajax. Then we can subscribe to the response stream in order to get that data.
Notice how with this exercise where we named each of these sources of data now things are bit more explicit where we have this source of data and we subscribe to it to get that data and put it in this destination.
Then we also this source of data which depends on that source of data, and we subscribe to that one to give data to that destination. Notice also that if I replace subscribe with do this won't actually perform the side effect, because remember do is almost like a map, and as you know map will return an observable.
This will return an observable, but we're not subscribing to that one, we're just dropping it on the floor. That's why this will never happen if we have the do here. That's why we need to have a subscribe.
As you can see we got rid of our do and our code will still work in the same way. We will update the dot position and we're also going to make requests to the server. That said, there are good use cases for do, and I would recommend that you use do for debugging. We can put here whenever we get an event we can console log that event's client ex-position.
Now, when we click we also log out the ex-position here. We also do the side effects. Use do for debugging, it's very convenient, because the sneakiness of a console log is not a problem since it's just for debugging.
The lesson is, make the pipeline of sources of data and destinations of data more explicit by using subscribe more often than using do.
Implement pause and resume feature correctly through RxJS
Eventually you will feel the need for pausing the observation of an Observable and resuming it later. In this lesson we will learn about use cases where pausing is possible, and what to do when pausing is impossible.
Sometimes you will need to create a pause and resume interaction in your app. For instance, imagine that you're implementing polling where every two seconds, you want to make a request to this endpoint.Maybe the data behind this endpoint is going to be different someday, that's why we have polling.
Right now, we're doing a request every two seconds. This is going to last forever. That's where pausing would be nice. While a pause and resume is possible in our Ext JS, there are some downsides to be aware of.
Ideally, we want to change the behavior of this observable, but that's not possible because this observable was defined to be like this, every two seconds, it emits an increasing number.
After it was defined, there is nothing that we can do to change that behavior. We could maybe apply operators but that's going to create new observables based on this observable. We can't actually change how this one behaves.
That said, we can apply a trick and that trick involves inserting a new observable which we'll call resume stream and this will emit false or true, so Booleans. We're going to make that as a subject. The subject here is not the important part. You could have done this with observables as well. It's only important that this will emit Booleans. False will mean to pause and true will mean to resume.
We're going to start by making it send false so that it's paused. We're going to set a timeout so that after a while, we are going to send true so that it starts, let's say, after 500 milliseconds. We're going to do that. Again, we're going to send false after, let's say, five seconds.
How do we use resume here? In this part of the code, we're going to write resume here, and then we're going to map, except we're going to use a switch map. We're going to take that Boolean, and we're going to check, is that Boolean true?
If it is, we're going to return this in our observable. If it is false, we're going to return our rx-observable.empty. I'm also going to put here at the end, do just for a console log for debugging purposes.
What does the switch map do here is that when we see a true from resume stream, then this stream will behave like this stream. When we see a false, then this stream will behave like the empty stream.
The empty stream won't emit any value, so that's why this part won't happen during the execution of this empty. While this one is executing, then we will get those events and that's how the Ajax is going to happen.
Let's try running this. Once we start, it starts pause. After 500 seconds, it does a request, and then it does another request. It stopped because five seconds kicked in and we have paused the resume. As you see here, we have requested zero. This zero came from the observable interval. After a while, we had requested one.
Now, I'm going to show something interesting where I'm going to resume again, let's say, at six seconds. We're going to see what happens. I'm going to pause at, let's say, nine seconds. We start paused and then after a while, it starts in two requests, and then we pause and then we start again. We pause at nine seconds.
Notice, we had requested zero, requested one, and then requested zero. This means that this stream here, the interval, although it behaves like this, it does zero, one and two, etc. We didn't resume and start getting two and three. When we resumed at this point here, we got zero.
That means that, as I told you before, we cannot pause and resume this stream, so that's why we didn't get the number two but we just restarted this stream. That's what happened. Once we subscribe again to this inner observable, we're going to start getting the number zero.
As you can see, we don't really truly have pause and resume for this observable, but we're just implementing the feature of pause and resume in a way that makes sense for us.
This is due to limitations with our Ext JS related to how observables fundamentally work, because after you call subscribe on an observable, you're not anymore in control of how data arrives. Observables are a one way communication of data from that observable to the observer.
We can't pause because that would mean that the observer wouldn't be able to send events back to the observable, and that would be a two way communication. That's not really the use case for observables. They are one way communication.
There are other features like EA6 generators that allow you to have two-way communication. Observables have different use cases.
The important thing is that we were able to implement this pause feature through different ways. It's not a problem that we cannot pause explicitly the specific observable. The lesson is, don't spend time trying to force observables to truly pause and resume.
Know when to extend the Observable class
The Observable in RxJS is a JavaScript class, and can be extended. In this lesson we will learn about those cases where extending the Observable class becomes a problem, and what cases where it makes sense to extend it.
The observable in RxJS is a class. For that reason, it is possible to extend it and make subclasses, such as my age observable here, for instance. My age observable has a constructor here that takes the initial age-- we're passing here 20 -- and then it will set that as the field here, age. When we subscribe to this custom observable, we're sending that initial age.This is what happens here when we subscribe. We're going to see that my age is 20, like that. My custom class here has also this setter, so I can set the age, but, as you noticed when I set it 21, nothing changes here in the behavior of the observable. This is starting to look a bit complicated.
What would we need to do in order for that age to actually appear is we would need to, for instance, set the observer of the class to this observer once we subscribe. Now we have a reference to the observer, and then we could do something like this observer should receive the next age that was just set, so maybe this works now.
Also, this is not ideal because what if we do the set age before the subscribe? Then, technically, here observer is not defined yet because it's only defined once we subscribe, so this is going to break. As we see, it broke.
It started to get complicated, and the reason is that you shouldn't usually extend the observable to make your own custom observables. There are reasons why this is bad idea. First of all, it is complicated. It's hard to get this right so that we don't get errors, such as this case.
You should avoid doing this for the same reasons that you should using Observable.create, because there you may violate the observable contract, like, for instance, what if you have an error here, and then after that try to send through observer.next()? That violates the observable contract.
Another reason is that usually observables don't carry state between different subscribers, but here we're definitely carrying this common state of the current observer throughout different subscribers. If we would subscribe more times here, we would have conflicts. We would have one subscribe trying to set the observer and the other subscribe setting it again, so there's this conflict.
As you can see, it's not the case here. We shouldn't be trying to do this. To build special observables, we just compose them with the operators that we know, not by extending the observable, because we'd get into very tricky situations.
That said, there are legitimate cases for extending the observable. One of them is, for instance, logging. Here we have an example of that. To give some context, here this observable ticks every 100 milliseconds, and it looks like this over time. Once we passed that through this chain of operators, we're multiplying each of those by 10, so we get 10, 20, 30.
We filter for only numbers bigger than 15, so we get 20 and 30. Then we count the amount of those, so we're going to get two at the end. That's why it's going to show a pop-up saying two, like that. Here we can make our own custom observable by extending that. Then we can use the lift method, which is very interesting.
We also need to extend a little bit more stuff, like we need to make a log subscriber and log operator, but the idea here is that we want to inject a special behavior throughout the whole chain. Here we want to inject console.log() whenever next happens, or also when an error happens, or when complete happens.
I recreated this observable here but now as a log observable. It's an observable, so it takes the subscribe function here. We can call observer.next() every 100 milliseconds and observer.complete(). This observable is a log observable. The interesting thing is that once you call map on a log observable, the result of this will be also a log observable.
Also, once you call filter on a log observable, you're going to get out a log observable and so forth. How did we get that is through lift. Lift allows us to inject a custom behavior throughout the chain of operators. Basically, we're creating a new observable, which is also a log observable.
That's how we get those new ones, and that's what we return here in the end, but we're telling this one here that its source is the current one, so this keyword. We're wrapping the operator, such as map, in this log operator. That's how the log operator is going to create a log of subscriber and then finally inject that custom behavior in the chain.
What we get is that, once I hit run here, it's going to log each of these operations in the console log like that. We saw here 10 as the output of map. 10 didn't pass the filter because it's less than 15. Then we saw 20 as the output of map. 20 also passes the filter, so that's why we see 20 again. Then we see 30 from the map. 30 passes the filter. We see 30 again.
Then the output of map completes. That also propagates the filter, so filter completes. Now that finally this one has completed, count will kick in, so it's going to count all of the events that happened. It saw just two, so that's why count now emits two. Then finally count completes.
It's very nice that we were able to inject this logging behavior into this chain. Notice that we didn't touch any code related to this part. We only changed what was the source here, what was this observable. If we go back to this, as you see, now we don't have logging anymore.
All that was needed was just to change this source, because this log observable will propagate down this chain. That's one good use case when you should extend the observable -- when you want to inject a custom behavior throughout all of the chain of operators.
That said, it's not that common to extend the observable for this purpose, and it's also rather tricky because you need to make your own log operator that wraps other operators and also a log subscriber. You need to know some bits of the internal logic in RxJS.
Overall, I recommend that you don't extend the observable class but just stick to operators that allow you to compose and easily build observables.
Make Observables hot only where necessary
Operators like publish(), refCount(), share() make it easy to convert a cold Observable to a hot one, and are often necessary to get some feature done. In this lesson we will learn when exactly do we need to convert to hot, and when can we leave the Observable cold.
If you have used RxJS long enough, you have probably heard of the idea of cold and hot observables. It can be confusing, but we're talking about how to make a shared execution of the same observable from multiple subscribers.The simplest example of this is when you have a clock observable that ticks every second, and we have six of those events. Now we have here two subscribers, a and b. Once we run that, each of those are going to get a different execution. If this is familiar to you, this is the usual problem.
We solve that by sharing that execution. Here, we can add .share() at the end, and now we're going to have a common execution of this clock observable shared to these two different subscribers. Now they're going to see the same events at the same time, like that. It's easy to stick to .share() without knowing what it does. I've seen cases where everything is .shared() where it doesn't need to be.
In order to avoid sharing everything, it's easy to remember that subscribe means invoke the execution of this collection so we can observe it. Without .share(), we're saying here, "Please, I want the values that come from this collection and deliver them here." Of course, if we say subscribe twice, we're invoking two different executions of that.
If you keep this in mind, it makes it much easier, because when we add .share() here, we're saying, "Instead of invoking a new execution, please share the execution of the observable if it exists."
Let's look at a bit more difficult example where it may be tempting to .share() everything. Here we also have a clock observable, and we have another observable called random numbers. We're mapping each of those to a random number. Then we have two other observables -- one of them gets random numbers smaller than 50, and the other gets random numbers larger than 50.
ToArray() simply collects all of them in an array so that we see all of them together at the end when it completes. Let's run this and see what happens. The clock is generating events over time. Then we're mapping each of those to a random number. When that completes, we collect all of those numbers into small and large collections.
Notice we have some problems here. We have four numbers in total in small and large category, but we had originally six random numbers, as you can see here. Also, these numbers don't actually occur in this original collection, and neither does 57.84 happen here. This is very strange. You may find this type of behavior in an RxJS app, so it's usually the case that we're not sharing executions here.
What's going on here? First of all, when we do subscribe, remember we're invoking, "Please, give me an execution of this collection." When we do this, we're going to look here, "Give me an execution of this collection. Give me an execution of that collection. Give me an execution of this random number," which will finally ask for an execution of the clock.
This is going to have its own execution. Each of these maps will be totally exclusive for large number subscriber and the same thing for small number and random number. We essentially have here three executions of the clock and three executions of the random number. That's why we have all of these different random numbers.
It's easy to instead put .share() here everywhere. Sometimes people do this like that in an attempt to ignore where the source is and try to share everything. To some extent, this may solve the problem. As we see here, all of these numbers correspond there, and they're the same, but this is an overkill, and we don't need to do this. We can just share what is necessary.
Let's look at what is it that we need to share exactly. It all has to do with what kind of side effects happen once we subscribe or once something happens in this operator. For instance, this operator will always give out the same output for the same input. For instance, if we give x is 20 here, it's always going to say true.
On the other hand, when we invoke this function multiple times, we get different numbers every time. This is a side effect. We probably want to share this part, for instance. Also, once we subscribe to an interval, you know that internally it will do setInterval(). That's also like a side effect because it's creating a pseudo-thread. It's creating that thing, so it's not so pure.
We could probably already conclude that this part needs to be shared, because once we call setInterval(), we want that to be shared. Then take(6) will always do the same thing, so we don't need to share that. That's not enough, because let's see what happens.
We get the numbers 61, 8, 3, 85, but those numbers are not corresponding here. The reason for that is that once we call a small number subscribe(), we're saying, "Please give me the execution of this." Then it's going to invoke the execution of this, and then finally invoke the execution of this, and invoke the execution of this map based on this clock.
Even though we're going to share the same setInterval(), but we're not sharing this map, so each of these subscribers is going to get a different execution of this map operation. That's why we also need to share this part. Then, once we do that, we see 81, 30, 60, 76, and those numbers appear here.
It's a good idea not to call share() everywhere because every time you call share() you're actually creating a new subject, and that might cause maybe problems with garbage collection. It's generally wise not to do operations that we don't need to do in the first place.
The tip here in order to avoid cold and hot problems is to remember that subscribe means invoke an execution of this collection, and then all of these executions are changed throughout these operators. Some of those executions we want to share, such as, "This random number, I wanna share it," or, "The setInterval() that's inside this creation, I wanna share that as well."
The other bits are going to behave exactly the same way no matter if you share them or not. That's why we don't do it. Over time, you get to develop a sense of what should be shared and what should not be shared, depending on these side effects that may occur.
댓글
댓글 쓰기