Using RxDart with Bloc(library) - Part 2

March 13, 2021 ■ 3 min read
article banner

This is 2nd part in 2 series post on using RxDart with Bloc library. In 1st part, we saw how we can use RxDart to transform events and states inside Bloc for some specific cases. In this post, we will see how we can handle stream of data and convert it to stream of states. Let's get started! 🚀

Futures vs Streams in Bloc

When getting some data from api/db, most of the time inside our Bloc we talk to some layer like service or repository which usually returns one time async data using Future. This is how we consume Future and convert it to states usually.

  1. Show some loading indicator.
  2. Fetch data from your service/repository.
  3. Based on result from previous step, show success/error state.
Stream<TodoState> mapEventToState(TodoEvent event) async* {
  if (event is GetTodosEvent) {
    try {
      // Show loading indicator
      yield TodoState.loading();

      // Get the data from service
      final todos = await _service.getTodos();

      // Show data in UI
      yield TodoState.success(todos);
    } on GetTodosException catch (e) {
      yield TodoState.error(e.errorMessage);
    } catch (e) {
      yield TodoState.error(genericErrorMessage);
    }
  }
}

However, sometimes you might receive data which is continuous instead of one time.

E.x - Firebase Realtime DB data, WebSockets, etc.

We can represent this continuous data as a Stream in Dart. Can we write similar logic when dealing with Stream of data(Todos) inside our mapEventToState method 🤔 Let's try that out.

StreamSubscription _todosSubscription;
...
...
Stream<TodoState> mapEventToState(TodoEvent event) async* {
  if (event is GetTodosEvent) {
    _todosSubscription?.cancel();
    _todosSubscription = _service.todos().listen(
      (todos) {
        yield TodoState.success(todos);
      },
      onError: (e, _) {
        yield TodoState.error(e.errorMessage);
      },
    );
  }
}

WAIT! That is not valid code. You cannot yield state from inside that listen() method, no? Because it is isnt' async generator method like our mapEventToState() method and we can only yield from inside async generator. What do we do then 🤔

Quick Google-Fu told me we can emit extra events from inside our listen and onError methods and then listen to those events inside mapEventToState method to yield states. WHAT 😅?

Something like:

StreamSubscription _todosSubscription;
...
...
Stream<TodoState> mapEventToState(TodoEvent event) async* {
  if (event is GetTodosEvent) {
    _todosSubscription?.cancel();
    _todosSubscription = _service.todos().listen(
      (todos) {
        add(TodosLoadedEvent(todos));
      },
      onError: (e, _) {
        add(TodosErrorEvent(e));
      },
    );
  } else if (event is TodosLoadedEvent) {
    yield TodoState.success(todos);
  } else if (event is TodosErrorEvent) {
    yield TodoState.error(event.errorMessage);
  }
}

We can make it bit more readable by using yield* and separating different _mapEventToState methods.

StreamSubscription _todosSubscription;
...
...
Stream<TodoState> mapEventToState(TodoEvent event) async* {
  if (event is GetTodosEvent) {
    yield* _mapGetTodosEventToState(event);
  } else if (event is TodosLoadedEvent) {
    yield* _mapTodosLoadedEventToState(event);
  } else if (event is TodosErrorEvent) {
    yield* _mapTodosErrorEventToState(event);
  }
}

Stream<TodoState> _mapGetTodosEventToState(GetTodosEvent event) {
  _todosSubscription?.cancel();
  _todosSubscription = _service.todos().listen(
    (todos) {
      add(TodosLoadedEvent(todos));
    },
    onError: (e, _) {
      add(TodosErrorEvent(e));
    },
  );
}

Stream<TodoState> _mapTodosLoadedEventToState(TodosLoadedEvent event) async* {
  yield TodoState.success(todos);
}

Stream<TodoState> _mapTodosErrorEventToState(TodosErrorEvent event) async* {
  yield TodoState.error(event.errorMessage);
}

This is however feels wrong due to following reasons:

  • Unnecessary events(TodosLoadedEvent and TodosErrorEvent) needs to be created to convert incoming data from our stream into states.
  • Too many hoops to jump to understand the flow.

Is there any other way? Of course might be, right? Otherwise what was the purpose of this blog post 😜

Stream and Async Generator

Before we come up with our approach which will not involve creating additional events like above, we should first understand how streams and async generator can be used together.

Complete example which we will be going through:

import 'dart:async';
import 'package:rxdart/rxdart.dart';

void main(List<String> arguments) async {
  final _intController = StreamController<int>.broadcast();

  final _subscription =
      _intController.stream.asyncExpand(_mapEventToState).listen(
    (value) {
      print('listen : $value');
    },
    onError: (e) {
      print('onError: ${e.message}');
    },
  );

  _intController.add(10);
  Future.delayed(Duration(seconds: 1), () {
    _intController.add(11);
  });
}

Stream<int> _mapEventToState(int to) async* {
  if (to <= 10) {
    for (var i = 1; i <= to; i++) {
      yield i;
    }
  } else {
    yield* RangeStream(to, to + 9); // It is endInclusive
  }
}

Here we've StreamController of type int. It emits two ints - 10 and 11(after 1s delay).

// ...
void main(List<String> arguments) async {
  final _intController = StreamController<int>.broadcast();
  // ...
  // Removed for brevity
  _intController.add(10);  Future.delayed(Duration(seconds: 1), () {    _intController.add(11);  });
  //... 
}
// ...
// ...

We listen to stream emitted from this StreamController and apply _mapEventToState() method to result received before printing out final value to console output.

import 'dart:async';
import 'package:rxdart/rxdart.dart';

void main(List<String> arguments) async {
  final _intController = StreamController<int>.broadcast();
  final _subscription =      _intController.stream.asyncExpand(_mapEventToState).listen(    (value) {      print('listen : $value');    },    onError: (e) {      print('onError: ${e.message}');    },  );
  //...
  //...
}
Stream<int> _mapEventToState(int to) async* {  if (to <= 10) {    for (var i = 1; i <= to; i++) {      yield i;    }  } else {    yield* RangeStream(to, to + 9); // It is endInclusive  }}

This is similar to how Bloc converts our Stream<Event> to Stream<State> using the logic we write inside mapEventToState method. Though this is not exactly how it is done inside bloc but you get the gist. So, let's focus now on _mapEventToState method here.

Stream<int> _mapEventToState(int to) async* {
  if (to <= 10) {
    for (var i = 1; i <= to; i++) {
      yield i;
    }
  } else {
    yield* RangeStream(to, to + 10);
  }
}

We can see that it is async generator which converts one Stream<int> into another Stream<int> based on input paramter to passed to it.

If to <= 10, it goes through for loop and emits int's one by one from 1 to 10 using yield keyword. This is similar to how we yield our states inside mapEventToState method inside Bloc when dealing with Future as we saw earlier.

If to > 10, it will tell RangeStream from RxDart which is another stream emitting a sequence of Integers within a specified range. In this case from to to to + 9.

Console ouput of the code would be:

listen : 1
listen : 2
listen : 3
listen : 4
listen : 5
listen : 6
listen : 7
listen : 8
listen : 9
listen : 10
listen : 11
listen : 12
listen : 13
listen : 14
listen : 15
listen : 16
listen : 17
listen : 18
listen : 19
listen : 20
Exited

Final Solution

We saw that using yield* on source of stream directly can work without creating adding additional events as done earlier, right? Let's try that out on original example.

Using the relation between async generator and stream we learned above, we can do the following:


Stream<TodoState> mapEventToState(TodoEvent event) async* {
  if (event is GetTodosEvent) {
    yield* _service.todos();
  }
}

We still need to convert Stream<Todos> to Stream<TodoState> for our UI to render, correct? We can do that by using map and couple other RxDart lib operators like onErrorReturnWith and startWith.


Stream<TodoState> mapEventToState(TodoEvent event) async* {
  if (event is GetTodosEvent) {
    yield* _service
        .todos()
        .map<TodoState>((todos) => TodoState.success(todos))
        .onErrorReturnWith((err) => TodoState.error(err.message))
        .startWith(TodoState.loading());
  }
}
  • map => converts list of todos to TodoState.success
  • onErrorReturnWith => called whenever todos stream emits any error and is converted to TodoState.error
  • startWith => called when stream is subscribed first time on call of GetTodosEvent and is converted to TodoState.loading.

By doing this we're connecting entire stream chain inside bloc like following:

// This is how it connects inside bloc
events.asyncExpand((event){
  if (event is GetTodosEvent) {
    yield* _service
        .todos()
        .map<TodoState>((todos) => TodoState.success(todos))
        .onErrorReturnWith((err) => TodoState.error(err.message))
        .startWith(TodoState.loading());
  }
}).listen((transition) {
  // bloc emits transitions and state here internally
  // which "BlocBuilder" and "BlocConsumer" listens to
})

As entire chain in one stream subscription, the bonus benefit we get from this is that we don't need to worry about canceling stream subscription as well because bloc takes care of it internally here. That's it! By using yield* and using some rx operators we have made code a bit more readable and avoided having to create extra events for it.

I've created sample repo in case you want to play with small todo example we're talking about in this post. Hope that was useful. I came across this use case in one of the app i was working on so decided to share it. Thanks for reading 🙂

Want to be notified of similar posts? Follow me @punit__d on Twitter 🙂