Home How replace returned observable with a new one RxJava2
Reply: 1

How replace returned observable with a new one RxJava2

bxfvgekd
1#
bxfvgekd Published in 2017-11-14 23:01:29Z

I have one case when I need to return an observable immediately, but then replace this observable with another one.

Here is an example

    private Flowable<byte[]> mFlowableStream = Flowable.empty();

   @Override
    public Flowable<byte[]> startStreamRead() {
        bindToService();
        return mFlowableStream;
    }

And then after binding to service I provide it a callback connection like that

@Override
public void bindToService() {
    mAppContext.bindService(new Intent(mAppContext,StreamService.class), mServiceConnection, 0);
}

    @Override
    public void onServiceConnected(ComponentName name, IBinder binder) {

       mServiceInterection = ((StreamServiceInterection.LocalBinder) binder).getServiceInteractor();
        mStreamDisposable = mServiceInterection.getStream()
                .subscribe(new Consumer<byte[]>() {
                    @Override
                    public void accept(byte[] data) throws Exception {

                    }
                });
    }

What I want to do is to somehow replace returned previously mFlowableStream with a new observable that I got from service.

What are possible strategies to implement this ? Maybe I should return some other value, like Future.

Please suggest how to solve this problem

Thanks

Son Tieu
2#
Son Tieu Reply to 2017-11-15 02:55:08Z

You can use Flowable.create instead of Flowable.empty

Then when new data come, just push to flowable. Like Example

final ArrayList<FlowableEmitter<Integer>> arrEmitter = new ArrayList<>();

    Flowable<Integer> futureFlow = Flowable.create(new FlowableOnSubscribe<Integer>() {
        @Override
        public void subscribe(final FlowableEmitter<Integer> e) throws Exception {
            e.onNext(1);
            e.onNext(2);

            arrEmitter.add(e); // hold emitter to use later
        }
    }, BackpressureStrategy.BUFFER);

    futureFlow.subscribe(new ResourceSubscriber<Integer>() {
        @Override
        public void onNext(Integer integer) {
            System.out.println("onNext: " + integer);
        }

        @Override
        public void onError(Throwable t) {

        }

        @Override
        public void onComplete() {
            System.out.println("onComplete");
        }
    });


    // =========== When data come 
    FlowableEmitter<Integer> holdEmitter = arrEmitter.get(0);

    holdEmitter.onNext(3);

Or use you can use **Subject* type according to your need

Understanding RxJava Subject — Publish, Replay, Behavior and Async Subject

You need to login account before you can post.

About| Privacy statement| Terms of Service| Advertising| Contact us| Help| Sitemap|
Processed in 0.305941 second(s) , Gzip On .

© 2016 Powered by mzan.com design MATCHINFO