stream<T> method
- Stream<
T> executor()
Returns the Stream executed in a wrapped safe environment.
Implementation
Stream<T> stream<T>(Stream<T> Function(CommonDatabase db) executor) {
if (isClosed || db == null) {
return const Stream.empty();
}
StreamSubscription? subscription;
StreamController<T>? controller;
controller = StreamController(
onListen: () {
if (isClosed || db == null) {
return;
}
if (subscription != null) {
subscription?.cancel();
_subscriptions.remove(subscription);
}
subscription = executor(db!).listen(
(e) {
if (controller?.isClosed != true) {
controller?.add(e);
}
},
onError: (e) {
if (e is! StateError &&
e is! CouldNotRollBackException &&
!e.isConnectionClosedException) {
controller?.addError(e);
}
},
onDone: () => controller?.close(),
);
_subscriptions.add(subscription!);
},
onCancel: () {
if (subscription != null) {
subscription?.cancel();
_subscriptions.remove(subscription);
_controllers.remove(controller);
}
},
);
_controllers.add(controller);
return controller.stream;
}