stream<T> method

Stream<T> stream<T>(
  1. Stream<T> executor(
    1. CommonDatabase db
    )
)

Returns the Stream executed in a wrapped safe environment.

Implementation

Stream<T> stream<T>(Stream<T> Function(CommonDatabase db) executor) {
  if (isClosed || db == null) {
    return const Stream.empty();
  }

  StreamSubscription? subscription;
  StreamController<T>? controller;

  controller = StreamController(
    onListen: () {
      if (isClosed || db == null) {
        return;
      }

      if (subscription != null) {
        subscription?.cancel();
        _subscriptions.remove(subscription);
      }

      subscription = executor(db!).listen(
        (e) {
          if (controller?.isClosed != true) {
            controller?.add(e);
          }
        },
        onError: (e) {
          if (e is! StateError &&
              e is! CouldNotRollBackException &&
              !e.isConnectionClosedException) {
            controller?.addError(e);
          }
        },
        onDone: () => controller?.close(),
      );

      _subscriptions.add(subscription!);
    },
    onCancel: () {
      if (subscription != null) {
        subscription?.cancel();
        _subscriptions.remove(subscription);
        _controllers.remove(controller);
      }
    },
  );
  _controllers.add(controller);

  return controller.stream;
}