LJ的Blog

学海无涯苦做舟

0%

RxJava从放弃到入门(二):学,不能停

写在前面

上篇文章RxJava从放弃到入门(一):基础篇讲述的东西都是非常基础的东西,这一篇准备讲述以下一些东西:

创建Observable:

  • just
  • defer

操作符:

  • map
  • flatmap

经验总结:

  • 自己遇到的一些坑和经验

再叙Observable

在上一篇里我们已经了解了在RxJava中Observable扮演了什么样的角色,如果你还没看过,没事,我们一起来回忆以下:在RxJava中,Observable扮演的是一个发射数据或数据序列的角色。Observer则是接收Observable发射的东西。上次只提到了一种hello world的打印方法,是否会让你感到不爽?让我们当一回孔乙己,学一下“茴”字的N种写法。当然在这我并不会以流水账的形式记录每一种创建操作的流程,那样无疑是没有必要的,实在想要了解你可以去看文档。

just

首先肯定还是我们的hello world

1
2
3
4
5
6
Observable.just("hello","world").subscribe(new Action1<String>() {
@Override
public void call(String s) {
Log.e(TAG, s);
}
});

just.png

上面出现了一个Action1,你去看他的继承关系可能会让你疑惑,因为接收数据的Observer或者是其子类都跟他没啥关系。没关系,我们直接看源码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public final Subscription subscribe(final Action1<? super T> onNext) {
if (onNext == null) {
throw new IllegalArgumentException("onNext can not be null");
}

return subscribe(new Subscriber<T>() {

@Override
public final void onCompleted() {
// do nothing
}

@Override
public final void onError(Throwable e) {
throw new OnErrorNotImplementedException(e);
}

@Override
public final void onNext(T args) {
onNext.call(args);
}

});
}

可以很明显的看到在这个方法内部,还是有一个Subscriber来接收数据的,这个Subscriber是观察者Observer的实现类(Subscriber本身是一个抽象类)。不同的只是在onNext方法中将处理数据的逻辑交给接口Action1的call方法,而最终实现逻辑是交由用户来实现的。这种设计明显是为了方便我们使用,弄明白这一点后,我们可以就可以安心的使用Action1来跑我们的代码了。

使用just方法创建的Observable会按照顺序将参数一一发射,如果不能理解的话,恩,官方文档的图也是极好的:
文档

看了just之后你可能会觉得这玩意很方便,可以不用自己手动调subscriber.onNext()和其他的方法了~但是有得必有失,你用一个create方法从头开始创建一个observable时,你对这个observable的特性是了解的。而你将目光投向一个经过封装的方法时,你需要花费更多的时间去了解他。下面来一个我曾看过的一篇文章里谈到的一个错误使用:

1
2
3
4
5
6
7
8
9
10
11
12
public Observable<String> aaobservable() {
return Observable.just(value);
}
//调用以上方法创建一个Observable
Observable<String> aa = aaobservable();
value = "String";
aa.subscribe(new Action1<String>() {
@Override
public void call(String s) {
Log.e(TAG, s);
}
});

以上代码你觉得运行的结果会是什么?会是 String 吗?结果在下面
你这么一说我就懂了.png

啥?为毛是null?很简单,上源码里看看就行了。首先点进just(T value)这个方法里瞧瞧,他究竟干了啥。

1
2
3
public final static <T> Observable<T> just(final T value) {
return ScalarSynchronousObservable.create(value);
}

继续追踪,发现create()源码如下:

1
2
3
public static final <T> ScalarSynchronousObservable<T> create(T t) {
return new ScalarSynchronousObservable<T>(t);
}

我去,可算看到了,既然是调了构造方法,那应该不会再有什么幺蛾子了吧?进构造方法看一看:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
protected ScalarSynchronousObservable(final T t) {
super(new OnSubscribe<T>() {

@Override
public void call(Subscriber<? super T> s) {
/*
* We don't check isUnsubscribed as it is a significant performance impact in the fast-path use cases.
* See PerfBaseline tests and https://github.com/ReactiveX/RxJava/issues/1383 for more information.
* The assumption here is that when asking for a single item we should emit it and not concern ourselves with
* being unsubscribed already. If the Subscriber unsubscribes at 0, they shouldn't have subscribed, or it will
* filter it out (such as take(0)). This prevents us from paying the price on every subscription.
*/
s.onNext(t);
s.onCompleted();
}

});
this.t = t;
}

首先抛开注释,直接调用了onNext和onCompleted,明显没其他多的东西。而那段注释,额用我比较差的英语水平来翻译一下第一句话……我们不检查是否被订阅,因为在用例中这会显著的影响性能。恩,出于各种考虑,他直接发射了数据,并不关心有没有人订阅之类的……哦,原来如此,在创建Observable的时候value还是null,在那时数据已经被发射了,之后再更改value的值也无济于事了。知晓了原因之后那该怎么解决这个问题呢?

很容易想到的一个方法是create()方法,毕竟这个方法会从头建造一个Observable,一切尽在你的掌握中。

defer

使用defer()来创建Observable会在有观察者订阅时才创建Observable,并且为每一个观察者创建一个新的Observable。回想一下,我们上面的代码之所以出现错误,就是因为过早的创建Observable和发射数据导致的。defer()这种在订阅时才创建是解决以上问题的方法之一,那么上代码:
创建Observable的方法:

1
2
3
4
5
6
7
8
public Observable<String> aaobservable() {
return Observable.defer(new Func0<Observable<String>>() {
@Override
public Observable<String> call() {
return Observable.just(value);
}
});
}

验证:

1
2
3
4
5
6
7
8
Observable<String> aaobservable = aaobservable();
value = "String";
aaobservable.subscribe(new Action1<String>() {
@Override
public void call(String s) {
Log.e(TAG, s);
}
});

成功

好了,对于just和defer就先告一段落,同时对于整个Observable也就告一段落了。可能到这你还觉得不够,毕竟归根到底我说到这也只讲了just和defer,不过我还是不打算继续下去了,我这里对于Observable的描述已经占去了太多的篇幅了。关于更多的创建操作我的建议是阅读官方文档。

操作符

Map

图来自于文档

在我看来这个操作符体现的是一种“一对一”的转换,比如你现在需要一张图,但是你的输入是一个string(这算是比较经典的场景了),你就可以使用如下代码进行变换:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
Observable.just(str).map(new Func1<String, Bitmap>() {
@Override
public Bitmap call(String s) {
try {
URLConnection con = new URL(s).openConnection();
InputStream inputStream = con.getInputStream();
Bitmap bitmap = BitmapFactory.decodeStream(inputStream);
return bitmap;
} catch (IOException e) {
e.printStackTrace();
}
return null;
}
}).subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread());

代码效果就不展示了,你懂就行。这篇文章里我暂时不会去解析map以及接下来会讲述的几个操作符的变换原理,留待以后更深入的了解之后再去用更清晰的语言来表述。

flatmap

上述map可以实现一对一的转换,那么flatmap则是实现一对多的转换。在RxJava的文档上是如此描述flatmap的:

将一个发射数据的Observable变换为多个Observables,然后将它们发射的数据合并后放进一个单独的Observable

图片来自官方文档

对于一个url你可以用map一对一的将其转换为一个bitmap,对于一个含有url的string数组你也可以采用以下的方式来转换:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
Observable.just(str).map(new Func1<String[], Bitmap>() {
@Override
public Bitmap call(String[] s) {
for (String a : s) {
try {
URLConnection con = new URL(a).openConnection();
InputStream inputStream = con.getInputStream();
Bitmap bitmap = BitmapFactory.decodeStream(inputStream);
return bitmap;
} catch (IOException e) {
e.printStackTrace();
}
}
return null;
}
}).subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<Bitmap>() {
@Override
public void call(Bitmap bitmap) {
image.setImageBitmap(bitmap);
}
});

但是写完之后会不会觉得有点不得劲?感觉这很不RxJava,很让人不愉快?那就对了,人要对自己好一点,觉得不爽就换个flatmap试试:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
Observable.from(str).flatMap(new Func1<String, Observable<String>>() {
@Override
public Observable<String> call(String s) {
return Observable.just(s);
}
}).map(new Func1<String, Bitmap>() {
@Override
public Bitmap call(String s) {
Bitmap bt = null;
try {
URL url = new URL(s);
URLConnection con = url.openConnection();
bt = BitmapFactory.decodeStream(con.getInputStream());
} catch (MalformedURLException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
return bt;
}
}).subscribe(new Action1<Bitmap>() {
@Override
public void call(Bitmap bitmap) {

}
});

不管效果如何,这一条链子写下来就是让人很舒服~同时逻辑也非常的清晰,以后再改这种代码的时候可以少骂几句娘了。

小结

这一次关于RxJava的不能停到这里就要告一段落了,好在我已经把我想要写的都写的差不多了,不过这里的小结也不能光灌水。最近本来打算开个项目把gank io的api搞一下,不过最近突然就忙起来了,所以那个东西先搁置一下,关于我初学时的项目的重构已经在进行了。使用了MVP,部分逻辑使用了RxJava,不过因为是初学时写的,自己不满意的地方实在是太多了。以下的两点是我最近在使用RxJava时自己总结的一点经验,不足之处欢迎指出:

线程控制就交给RxJava去做

说实话,我还没有开始使用Retrofit,我使用的一直是Hongyang大神封装的OkhttpUtil,因为的确挺好用的。现在一般的请求框架也都会自己封装线程池,实现异步回调。我就在这上面吃了一个小亏,我在使用RxJava实现读取数据的逻辑的时候,先从网络获取数据,获取不到则读取本地缓存。结果每次读取的都是本地数据,这让我很奇怪,我打了log发现网络请求也是成功的。后来我有点明白了:

我使用okhttputil获取数据成功时回调,但是我在订阅的回调里面判断数据状态时,数据还没有获取到,所以执行了获取本地数据的逻辑。所以关于异步这个问题,我想说的就是写同步的代码,让RxJava做异步的事。当然,我的看法并不一定准确,你可以在底下评论说出你的想法,欢迎讨论。

最后再来个真正的小结吧,说实话,我觉得写这两篇RxJava的文章对我自己的帮助真的挺大的,让我对RxJava的认知又提升了一点。现在再看以前写的一些RxJava的代码,都觉得不够RxJava,所以要走的路还很长啊~