Home How replace returned observable with a new one RxJava2
Reply: 1

How replace returned observable with a new one RxJava2

bxfvgekd Published in 2017-11-14 23:01:29Z

I have one case when I need to return an observable immediately, but then replace this observable with another one.

Here is an example

    private Flowable<byte[]> mFlowableStream = Flowable.empty();

    public Flowable<byte[]> startStreamRead() {
        return mFlowableStream;

And then after binding to service I provide it a callback connection like that

public void bindToService() {
    mAppContext.bindService(new Intent(mAppContext,StreamService.class), mServiceConnection, 0);

    public void onServiceConnected(ComponentName name, IBinder binder) {

       mServiceInterection = ((StreamServiceInterection.LocalBinder) binder).getServiceInteractor();
        mStreamDisposable = mServiceInterection.getStream()
                .subscribe(new Consumer<byte[]>() {
                    public void accept(byte[] data) throws Exception {


What I want to do is to somehow replace returned previously mFlowableStream with a new observable that I got from service.

What are possible strategies to implement this ? Maybe I should return some other value, like Future.

Please suggest how to solve this problem


Son Tieu
Son Tieu Reply to 2017-11-15 02:55:08Z

You can use Flowable.create instead of Flowable.empty

Then when new data come, just push to flowable. Like Example

final ArrayList<FlowableEmitter<Integer>> arrEmitter = new ArrayList<>();

    Flowable<Integer> futureFlow = Flowable.create(new FlowableOnSubscribe<Integer>() {
        public void subscribe(final FlowableEmitter<Integer> e) throws Exception {

            arrEmitter.add(e); // hold emitter to use later
    }, BackpressureStrategy.BUFFER);

    futureFlow.subscribe(new ResourceSubscriber<Integer>() {
        public void onNext(Integer integer) {
            System.out.println("onNext: " + integer);

        public void onError(Throwable t) {


        public void onComplete() {

    // =========== When data come 
    FlowableEmitter<Integer> holdEmitter = arrEmitter.get(0);


Or use you can use **Subject* type according to your need

Understanding RxJava Subject — Publish, Replay, Behavior and Async Subject

You need to login account before you can post.

About| Privacy statement| Terms of Service| Advertising| Contact us| Help| Sitemap|
Processed in 0.305941 second(s) , Gzip On .

© 2016 Powered by mzan.com design MATCHINFO