changes method

Stream<List<MapChangeNotification<K, T>>> changes()

Gets MapChangeNotifications from Stream.

Implementation

Stream<List<MapChangeNotification<K, T>>> changes() {
  Map<K, T> last = {};
  bool first = true;

  return asyncExpand((e) async* {
    final List<MapChangeNotification<K, T>> changed = [];

    for (final MapEntry<K, T> entry in e.entries) {
      final T? item = last[entry.key];
      if (item == null) {
        changed.add(MapChangeNotification.added(entry.key, entry.value));
      } else {
        if (entry.value != item) {
          changed.add(
            MapChangeNotification.updated(entry.key, entry.key, entry.value),
          );
        }
      }
    }

    for (final MapEntry<K, T> entry in last.entries) {
      final T? item = e[entry.key];
      if (item == null) {
        changed.add(MapChangeNotification.removed(entry.key, entry.value));
      }
    }

    last = Map.from(e);

    // Always emit the first changes, even if they are empty.
    if (first) {
      first = false;
      yield changed;
    } else if (changed.isNotEmpty) {
      yield changed;
    }
  });
}