What is RxJava?
RxJava is a Java based extension of ReactiveX. It provides implementation or ReactiveX project in Java. Following are the key characteristics of RxJava.
What is ReactiveX?
ReactiveX is a project which aims to provide reactive programming concept to various programming languages. Reactive Programming refers to the scenario where program reacts as and when data appears. It is a event based programming concept and events can propagate to registers observers.
As per the Reactive, they have combined the best of Observer pattern, Iterator pattern and functional pattern.
ReactiveX is a combination of the best ideas from the Observer pattern, the Iterator pattern, and functional programming.
What is Functional Programming?
Functional programming revolves around building the software using pure functions.
A pure function do not depends upon previous state and always returns the same result for the same parameters passed.
Pure functions helps avoiding problems associated with shared objects, mutable data and side effects often prevalent in multi-threading environments.
What is Reactive Programming?
Reactive programming refers to event driven programming where data streams comes in asynchronous fashion and get processed when they are arrived.
What is Functional Reactive Programming?
RxJava implements both the concepts together, where data of streams changes over time and consumer function reacts accordingly.
What is Reactive Manifesto?
Reactive Manifesto is an on-line document stating the high standard of application software systems. As per the manifesto, following are the key attributes of a reactive software :-
Explain Key components of RxJava.
RxJava have two key components: Observables and Observer.
An observer is never notified if items are not present or a callback is not returned for a previous item.
RxJava - How Observable works ?
Observables represents the sources of data where as Observers (Subscribers) listen to them. In nutshell, an Observable emits items and a Subscriber then consumes these items.
Observable
Subscriber
RxJava - Creating Observables
Following are the base classes to create observables.
Explain the convenient methods to create observables in Observable class.
Following are the convenient methods to create observables in Observable class.
RxJava - Single Observable
The Single class represents the single value response. Single observable can only emit either a single successful value or an error. It does not emit onComplete event.
Class Declaration
Following is the declaration for io.reactivex.Single<T> class −</T>
public abstract class Single<T>
extends Object
implements SingleSource<T></T></T>
Protocol
Following is the sequential protocol that Single Observable operates −
onSubscribe (onSuccess | onError)?
RxJava - Single Observable Example
Single Example
import java.util.concurrent.TimeUnit;
import io.reactivex.Single;
import io.reactivex.disposables.Disposable;
import io.reactivex.observers.DisposableSingleObserver;
import io.reactivex.schedulers.Schedulers;
public class ObservableTester {
public static void main(String[] args) throws InterruptedException {
//Create the observable
Single<String> testSingle = Single.just("Hello World");</String>
//Create an observer
Disposable disposable = testSingle
.delay(2, TimeUnit.SECONDS, Schedulers.io())
.subscribeWith(
new DisposableSingleObserver<String>() {
@Override
public void onError(Throwable e) {
e.printStackTrace();
}
@Override
public void onSuccess(String value) {
System.out.println(value);
}
});
Thread.sleep(3000);
//start observing
disposable.dispose(); } }Hello World
RxJava - MayBe Observable
The MayBe class represents deferred response. MayBe observable can emit either a single successful value or no value.
Class Declaration
Following is the declaration for io.reactivex.Single<T> class −</T>
public abstract class Maybe<T>
extends Object
implements MaybeSource<T></T></T>
Protocol
Following is the sequential protocol that MayBe Observable operates −
onSubscribe (onSuccess | onError | OnComplete)?
RxJava - MayBe Observable Example
MayBe Example
import java.util.concurrent.TimeUnit;
import io.reactivex.Maybe;
import io.reactivex.disposables.Disposable;
import io.reactivex.observers.DisposableMaybeObserver;
import io.reactivex.schedulers.Schedulers;
public class ObservableTester {
public static void main(String[] args) throws InterruptedException {
//Create an observer
Disposable disposable = Maybe.just(“Hello World”)
.delay(2, TimeUnit.SECONDS, Schedulers.io())
.subscribeWith(new DisposableMaybeObserver<String>() {
@Override
public void onError(Throwable e) {
e.printStackTrace();
}</String>
@Override
public void onSuccess(String value) {
System.out.println(value);
}
@Override
public void onComplete() {
System.out.println("Done!");
}
});
Thread.sleep(3000);
//start observing
disposable.dispose(); } }RxJava - Completable Observable
The Completable class represents deferred response. Completable observable can either indicate a successful completion or error.
Class Declaration
Following is the declaration for io.reactivex.Completable class −
public abstract class Completable
extends Object
implements CompletableSource
Protocol
Following is the sequential protocol that Completable Observable operates −
onSubscribe (onError | onComplete)?
RxJava - Completable Observable Example
Completable Example
import java.util.concurrent.TimeUnit;
import io.reactivex.Completable;
import io.reactivex.disposables.Disposable;
import io.reactivex.observers.DisposableCompletableObserver;
import io.reactivex.schedulers.Schedulers;
public class ObservableTester {
public static void main(String[] args) throws InterruptedException {
//Create an observer
Disposable disposable = Completable.complete()
.delay(2, TimeUnit.SECONDS, Schedulers.io())
.subscribeWith(new DisposableCompletableObserver() {
@Override
public void onError(Throwable e) {
e.printStackTrace();
}
@Override
public void onStart() {
System.out.println("Started!");
}
@Override
public void onComplete() {
System.out.println("Done!");
}
});
Thread.sleep(3000);
//start observing
disposable.dispose(); } }Started Done
RxJava - Using CompositeDisposable
The CompositeDisposable class represents a container which can hold multiple disposable and offers O(1) complexity of adding and removing disposables.
Class Declaration
Following is the declaration for io.reactivex.disposables.CompositeDisposable class −
public final class CompositeDisposable
extends Object
implements Disposable, io.reactivex.internal.disposables.DisposableContainer
RxJava - Using CompositeDisposable Example
CompositeDisposable Example
import io.reactivex.Maybe;
import io.reactivex.Single;
import io.reactivex.disposables.CompositeDisposable;
import io.reactivex.disposables.Disposable;
import io.reactivex.observers.DisposableMaybeObserver;
import io.reactivex.observers.DisposableSingleObserver;
import io.reactivex.schedulers.Schedulers;
import java.util.concurrent.TimeUnit;
public class ObservableTester {
public static void main(String[] args) throws InterruptedException {
CompositeDisposable compositeDisposable = new CompositeDisposable();
//Create an Single observer
Disposable disposableSingle = Single.just("Hello World")
.delay(2, TimeUnit.SECONDS, Schedulers.io())
.subscribeWith(
new DisposableSingleObserver<String>() {
@Override
public void onError(Throwable e) {
e.printStackTrace();
}
@Override
public void onSuccess(String value) {
System.out.println(value);
}
});
//Create an observer
Disposable disposableMayBe = Maybe.just("Hi")
.delay(2, TimeUnit.SECONDS, Schedulers.io())
.subscribeWith(new DisposableMaybeObserver<String>() {
@Override
public void onError(Throwable e) {
e.printStackTrace();
}
@Override
public void onSuccess(String value) {
System.out.println(value);
}
@Override
public void onComplete() {
System.out.println("Done!");
}
});
Thread.sleep(3000);
compositeDisposable.add(disposableSingle);
compositeDisposable.add(disposableMayBe);
//start observing
compositeDisposable.dispose(); } }Hello World Hi
RxJava - Creating Operators
Following are the operators which are used to create an Observable.
1. Create : Creates an Observable from scratch and allows observer method to call programmatically.
Creating Operator Example
import io.reactivex.Observable;
//Using fromArray operator to create an Observable
public class ObservableTester {
public static void main(String[] args) {
String[] letters = {“a”, “b”, “c”, “d”, “e”, “f”, “g”};
final StringBuilder result = new StringBuilder();
Observable<String> observable = Observable.fromArray(letters);
observable
.map(String::toUpperCase)
.subscribe( letter -> result.append(letter));
System.out.println(result);
}
}</String>
ABCDEFG
RxJava - Transforming Operators
Following are the operators which are used to transform an item emitted from an Observable:-
RxJava - Filtering Operators
Following are the operators which are used to selectively emit item(s) from an Observable:-
RxJava - Filtering Operator Example
Filtering Operator Example
import io.reactivex.Observable;
//Using take operator to filter an Observable
public class ObservableTester {
public static void main(String[] args) {
String[] letters = {“a”, “b”, “c”, “d”, “e”, “f”, “g”};
final StringBuilder result = new StringBuilder();
Observable<String> observable = Observable.fromArray(letters);
observable
.take(2)
.subscribe( letter -> result.append(letter));
System.out.println(result);
}
}</String>
ab
RxJava - Combining Operators
Following are the operators which are used to create a single Observable from multiple Observables :-