Using RxDart with Bloc(library) - Part 1

February 15, 2021 ■ 9 min read

In this post we will be looking at how we can use RxDart with Bloc library. This is divided into 2 part post to keep it small. In 1st part, we will see how we can use RxDart for doing transformation on your events/states in Bloc and in 2nd part we will look at how to handle stream of data coming from your service or repository layer and convert it to stream of states inside Bloc.

You might have not come across such use case in your app yet but keep reading in case you're curious or maybe this is what you're looking for 🙂

Note : In this post, I won't be talking about using RxDart with Bloc pattern. There are some good posts already on web you can refer to. This is specifically about using RxDart with Bloc library.

Why RxDart

Whenever your app has to deal with some continuous stream of data which could be events, states, etc. you can use Stream to represent it. Dart has already good support for Streams built in natively. All you have to do is use dart:async package in order to use it.

However, in some cases you may need some nice reactive extension methods which RxDart provides on top of this native stream api to make things easier. We will look at few such examples in this post when using it with Bloc library.

Bloc Transformations

When you look at Bloc library, Bloc is nothing but a box which accepts Stream<Events> as input and emits Stream<States> as output based on your application logic written inside mapEventToState method.

However, apart from using mapEventToState there are couple other methods in our Bloc you could override to have control over your incoming Stream<Events> and outgoing Stream<States>.

Note : We will only see few examples using transformEvents in this post. Similarly you could use tranformTransitions to change Transition<Event, State>> to control your outgoing state changes.

Transforming Events

If we look at default implementation of transformEvents method inside Bloc it looks like following:

Stream<Transition<Event, State>> transformEvents(
    Stream<Event> events,
    TransitionFunction<Event, State> transitionFn,
  ) {
    return events.asyncExpand(transitionFn);}

Looking at line highlighted above we can see that it applies asyncExpand operator on our Stream<Event> which means by default it ensures all events are processed in the order in which they are received.

But lets say we have couple of different use cases in our app where we want to control/transform these events before they call our mapEventToState method.

Case #1 - Text Search / Form Validation

Let's say for every key input from user we will be firing events from UI.

add(SearchEvent(input : <textInputTyped>))

With default behavior of Bloc(as we just saw above), for each keystroke our mapEventToState method will be called with SearchEvent consisting of latest typed input.


Stream<SearchState> mapEventToState(SearchEvent event) async* {
  try {
    yield SearchState.loading();
    final places = await _placesService.findPlaces(event.input);
    yield SearchState.success(places);
  } catch (e) {
    yield SearchState.error(e.message);
  }
}

However, this will result in our Places API getting called every time whenver user types something in TextField. Shouldn't we just search once user is done typing? 🤔 What if we could make Bloc call our mapEventToState method only after user has actually has stopped typing? Can we do it?

Yes, that is where transformEvents method comes into picture. Using it we can control our stream of events added to Bloc by the add() method. By overriding transformEvents inside bloc and applying RxDart operators like debounceTime and switchMap to Stream<Event> we can get the intended behavior.


Stream<Transition<SearchEvent, SearchState>> transformEvents(
    Stream<SearchEvent> events, transitionFn) {
  return events      .debounceTime(const Duration(milliseconds: 300))      .switchMap(transitionFn);}

Lets look at individual rxdart operators we're applying here.

  • debounceTime
  • switchMap

debounceTime

By applying debounceTime on Stream<Event> we're debouncing events within 300 ms duration so that when user is in process of typing their search input and 300 ms hasn't passed between 2 different SearchEvent we don't end up calling our logic in mapEventToState method. This will help us prevent spamming our Places API with incomplete typed inputs. But, why switchMap though?

switchMap

If you look at the documentation of switchMap in RxDart, applying this operator the newly created Stream will be be listened to and begin emitting items, and any previously created Stream will stop emitting

Sorry, what?! 😅 In simple words, Sorry! i mean diagram.

Let's say that user had done typing "De" and are waiting for the results.

call_in_progress

While above api call is still in progress, if user decides they need more granular results and they type one more letter "l" making search input "Del". In that case, we don't want our previous api call results to show up but only latest search results with input "Del". This is what switchMap allows us to do. By applying it we can ignore previous results and as a result states coming from old input.

api_call_cancelled

You might say why ignore results from previous input "De" and not show them in UI?

Good point! But, if the search results for "De" come after "Del" from Places API we might end up showing results for "De" instead of "Del" because those came later on.


Case #2 - Add To Cart / Remove From Cart

add_to_remove_from_cart

Note : We're only covering case where we're adding/remove only single qty of a particular product in the Cart. For multiple qty, you need to include qty field in event classes shown below.

Let's say we've CartBloc to handle adding and removing items in the Cart.
We can represent those events like following:

abstract class CartEvent extends Equatable {
  const CartEvent();

  
  List<Object> get props => [];
}

class AddToCartEvent extends CartEvent {
  final String productId;
  final String cartId;

  AddToCartEvent({
    this.productId,
    this.cartId,
  });

  
  List<Object> get props => [productId, cartId];
}

class RemoveFromCartEvent extends CartEvent {
  final String productId;
  final String cartId;

  RemoveFromCartEvent({
    this.productId,
    this.cartId,
  });

  
  List<Object> get props => [productId, cartId];
}

And, this is how our mapEventToState method handles those events.


Stream<CartState> mapEventToState(CartEvent event) async* {
  if (event is AddToCartEvent) {
    yield* _mapAddToCartEventToState(event);
  } else if (event is RemoveFromCartEvent) {
    yield* _mapRemoveFromCartEventToState(event);
  }
}

Stream<CartState> _mapAddToCartEventToState(AddToCartEvent event) async* {
  try{
    yield CartState.addingToCart(event.productId);

    final itemId = await _cartService.addToCart(event.cartId, event.productId);
    yield CartState.addedToCart(event.productId);
  }
  catch (e) {
    yield CartState.addToCartError(event.productId, e.message);
  }
}

Stream<CartState> _mapRemoveFromCartEventToState(RemoveFromCartEvent event) async* {
  try{
    yield CartState.removingFromCart(event.productId);

    final itemId = await _cartService.removeFromCart(event.cartId, event.productId);
    yield CartState.removedFromCart(event.productId);
  }
  catch (e) {
    yield CartState.removeFromCartError(event.productId, e.message);
  }
}

Good enough? So, what's the problem here? As we saw earlier, default behavior of transformEvents will trigger mapEventToState for each AddToCartEvent and RemoveFromCartEvent which is correct.

However, if user keeps spamming "Add/Remove" button for particular veggie it will result into multiple add/remove api call being made for same veggie unnecessarily. We can have logic in place to disable "add/remove" button when in addingToCart/removingFromCart states while api call is in progress to avoid this scenario entirely. But we can also use transformEvents here and provide custom implementation to avoid this scenario entirely without relying on disabling buttons.

Overriding transformEvents to prevent duplicate add/remove events like below:


Stream<Transition<CartEvent, CartState>> transformEvents(
    Stream<CartEvent> events, transitionFn) {
  return super.transformEvents(    event.distinct(),    transitionFn,  );
}

Or you could write it as(both are same)


Stream<Transition<CartEvent, CartState>> transformEvents(
    Stream<CartEvent> events, transitionFn) {
  return events.distinct().asyncExpand(transitionFn)}

Using distinct operator on our Stream<Event> here will make sure that 2 successive AddToCartEvent or RemoveFromCart for same productId and cartId combination will not be allowed. This will ensure even if someone keeps spamming "Add/Remove" button for same product it wouldn't result into duplicate api calls because our mapEventToState wouldn't be called in such cases. Plus, in this case we didn't needed Rx at all and Dart's native stream api was sufficient 😁 #FakeBlogTitle

What if api calls fail? In that case because of the distinct operator users wouldn't be able to retry adding/removing same product in the cart, no? 🤔. Yes, to prevent that scenario during the api failure we will have to emit event add(CartFailureEvent()) in catch blocks so that user can retry those failed add/remove actions.

// ...
// ...
Stream<CartState> _mapAddToCartEventToState(AddToCartEvent event) async* {
  try{
    yield CartState.addingToCart(event.productId);

    final itemId = await _cartService.addToCart(event.cartId, event.productId);
    yield CartState.addedToCart(event.productId);
  }
  catch (e) {
    add(CartFailureEvent());    yield CartState.addToCartError(event.productId, e.message);
  }
}

// ...
// ...
class CartFailureEvent extends CartEvent {}

At this point, it doesn't looks like a good case to use transformEvents at all, no? 😂 Yeah, but it is pretty good trade-off to handle everything inside Bloc vs having mix of logic in UI(disabling buttons during some state) + Bloc.

Note : Adding distinct operator like above wouldn't prevent user from simulatenously adding/removing different products in the cart which is intended behavior. This is because we check equals on combination of productId and cartId


Apart from above 2 cases, there are few other use cases where we could use RxDart with Bloc to do transformation on your events and transitions to control your events and states. Let me know in comments below how you use transformEvents inside your app. I've learnt this from Felix's(Bloc library author) FlutterSaurus codebase and have been using it for a while though never got time to write about it. Better late than never 😅

Please stay tuned for next post where we will see how we can handle Stream of data coming in from service/repository layer and convert it to Stream of states inside your mapEventToState method.

Want to be notified of similar posts? Follow me @punit__d on Twitter 🙂