SubscribeOn and ObserveOn
我们学到了如何在一个调度器上运行一个任务。但是我们如何利用它来和Observables一起工作呢?RxJava提供了subscribeOn()方法来用于每个Observable对象。subscribeOn()方法用Scheduler来作为参数并在这个Scheduler上执行Observable调用。
在“真实世界”这个例子中,我们调整loadList()函数。首先,我们需要一个新的getApps()方法来检索已安装的应用列表:
1
private Observable<AppInfo> getApps() {
2
return Observable.create(subscriber -> {
3
List<AppInfo> apps = new ArrayList<>();
4
SharedPreferences sharedPref = getActivity().getPreferences(Context.MODE_PRIVATE);
5
Type appInfoType = new TypeToken<List<AppInfo>>(){}.getType();
6
String serializedApps = sharedPref.getString("APPS", "");
7
if (!"".equals(serializedApps)) {
8
apps = new Gson().fromJson(serializedApps,appInfoType);
9
}
10
for (AppInfo app : apps) {
11
subscriber.onNext(app);
12
}
13
subscriber.onCompleted();
14
});
15
}
Copied!
getApps()方法返回一个AppInfo的Observable。它先从Android的SharePreferences读取到已安装的应用程序列表。反序列化,并一个接一个的发射AppInfo数据。使用新的方法来检索列表,loadList()函数改成下面这样:
1
private void loadList() {
2
mRecyclerView.setVisibility(View.VISIBLE);
3
getApps().subscribe(new Observer<AppInfo>() {
4
@Override
5
public void onCompleted() {
6
mSwipeRefreshLayout.setRefreshing(false);
7
Toast.makeText(getActivity(), "Here is the list!", Toast.LENGTH_LONG).show();
8
}
9
10
@Override
11
public void onError(Throwable e) {
12
Toast.makeText(getActivity(), "Something went wrong!", Toast.LENGTH_SHORT).show();
13
mSwipeRefreshLayout.setRefreshing(false);
14
}
15
16
@Override
17
public void onNext(AppInfo appInfo) {
18
mAddedApps.add(appInfo);
19
mAdapter.addApplication(mAddedApps.size() - 1, appInfo);
20
}
21
});
22
}
Copied!
如果我们运行代码,StrictMode将会报告一个不合规操作,这是因为SharePreferences会减慢I/O操作。我们所需要做的是指定getApps()需要在调度器上执行:
1
getApps().subscribeOn(Schedulers.io())
2
.subscribe(new Observer<AppInfo>() { [...]
Copied!
Schedulers.io()将会去掉StrictMode的不合规操作,但是我们的App现在崩溃了是因为:
1
at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.jav a:58)
2
at java.util.concurrent.Executors$RunnableAdapter.call(Executors. java:422)
3
at java.util.concurrent.FutureTask.run(FutureTask.java:237)
4
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutu reTask.access$201(ScheduledThreadPoolExecutor.java:152)
5
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutu reTask.run(ScheduledThreadPoolExecutor.java:265)
6
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolEx ecutor.java:1112)
7
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolE xecutor.java:587)
8
at java.lang.Thread.run(Thread.java:841) Caused by:
9
android.view.ViewRootImpl$CalledFromWrongThreadException: Only the original thread that created a view hierarchy can touch its views.
Copied!
Only the original thread that created a view hierarchy can touch its views.
我们再次回到Android的世界。这条信息简单的告诉我们我们试图在一个非UI线程来修改UI操作。意思是我们需要在I/O调度器上执行我们的代码。因此我们需要和I/O调度器一起执行代码,但是当结果返回时我们需要在UI线程上操作。RxJava让你能够订阅一个指定的调度器并观察它。我们只需在loadList()函数添加几行代码,那么每一项就都准备好了:
1
getApps()
2
.onBackpressureBuffer()
3
.subscribeOn(Schedulers.io())
4
.observeOn(AndroidSchedulers.mainThread())
5
.subscribe(new Observer<AppInfo>() { [...]
Copied!
observeOn()方法将会在指定的调度器上返回结果:如例子中的UI线程。onBackpressureBuffer()方法将告诉Observable发射的数据如果比观察者消费的数据要更快的话,它必须把它们存储在缓存中并提供一个合适的时间给它们。做完这些工作之后,如果我们运行App,就会出现已安装的程序列表:
Copy link