1 /*
2  * Copyright 2018 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package androidx.paging;
18 
19 import androidx.annotation.NonNull;
20 import androidx.annotation.Nullable;
21 import androidx.arch.core.executor.ArchTaskExecutor;
22 
23 import java.util.concurrent.Executor;
24 
25 import io.reactivex.BackpressureStrategy;
26 import io.reactivex.Flowable;
27 import io.reactivex.Observable;
28 import io.reactivex.ObservableEmitter;
29 import io.reactivex.ObservableOnSubscribe;
30 import io.reactivex.Scheduler;
31 import io.reactivex.functions.Cancellable;
32 import io.reactivex.schedulers.Schedulers;
33 
34 /**
35  * Builder for {@code Observable<PagedList>} or {@code Flowable<PagedList>}, given a
36  * {@link DataSource.Factory} and a {@link PagedList.Config}.
37  * <p>
38  * The required parameters are in the constructor, so you can simply construct and build, or
39  * optionally enable extra features (such as initial load key, or BoundaryCallback).
40  * <p>
41  * The returned observable/flowable will already be subscribed on the
42  * {@link #setFetchScheduler(Scheduler)}, and will perform all loading on that scheduler. It will
43  * already be observed on {@link #setNotifyScheduler(Scheduler)}, and will dispatch new PagedLists,
44  * as well as their updates to that scheduler.
45  *
46  * @param <Key> Type of input valued used to load data from the DataSource. Must be integer if
47  *             you're using PositionalDataSource.
48  * @param <Value> Item type being presented.
49  */
50 public final class RxPagedListBuilder<Key, Value> {
51     private Key mInitialLoadKey;
52     private PagedList.Config mConfig;
53     private DataSource.Factory<Key, Value> mDataSourceFactory;
54     private PagedList.BoundaryCallback mBoundaryCallback;
55     private Executor mNotifyExecutor;
56     private Executor mFetchExecutor;
57     private Scheduler mFetchScheduler;
58     private Scheduler mNotifyScheduler;
59 
60     /**
61      * Creates a RxPagedListBuilder with required parameters.
62      *
63      * @param dataSourceFactory DataSource factory providing DataSource generations.
64      * @param config Paging configuration.
65      */
RxPagedListBuilder(@onNull DataSource.Factory<Key, Value> dataSourceFactory, @NonNull PagedList.Config config)66     public RxPagedListBuilder(@NonNull DataSource.Factory<Key, Value> dataSourceFactory,
67             @NonNull PagedList.Config config) {
68         //noinspection ConstantConditions
69         if (config == null) {
70             throw new IllegalArgumentException("PagedList.Config must be provided");
71         }
72         //noinspection ConstantConditions
73         if (dataSourceFactory == null) {
74             throw new IllegalArgumentException("DataSource.Factory must be provided");
75         }
76         mDataSourceFactory = dataSourceFactory;
77         mConfig = config;
78     }
79 
80     /**
81      * Creates a RxPagedListBuilder with required parameters.
82      * <p>
83      * This method is a convenience for:
84      * <pre>
85      * RxPagedListBuilder(dataSourceFactory,
86      *         new PagedList.Config.Builder().setPageSize(pageSize).build())
87      * </pre>
88      *
89      * @param dataSourceFactory DataSource.Factory providing DataSource generations.
90      * @param pageSize Size of pages to load.
91      */
92     @SuppressWarnings("unused")
RxPagedListBuilder(@onNull DataSource.Factory<Key, Value> dataSourceFactory, int pageSize)93     public RxPagedListBuilder(@NonNull DataSource.Factory<Key, Value> dataSourceFactory,
94             int pageSize) {
95         this(dataSourceFactory, new PagedList.Config.Builder().setPageSize(pageSize).build());
96     }
97 
98     /**
99      * First loading key passed to the first PagedList/DataSource.
100      * <p>
101      * When a new PagedList/DataSource pair is created after the first, it acquires a load key from
102      * the previous generation so that data is loaded around the position already being observed.
103      *
104      * @param key Initial load key passed to the first PagedList/DataSource.
105      * @return this
106      */
107     @SuppressWarnings("unused")
108     @NonNull
setInitialLoadKey(@ullable Key key)109     public RxPagedListBuilder<Key, Value> setInitialLoadKey(@Nullable Key key) {
110         mInitialLoadKey = key;
111         return this;
112     }
113 
114     /**
115      * Sets a {@link PagedList.BoundaryCallback} on each PagedList created, typically used to load
116      * additional data from network when paging from local storage.
117      * <p>
118      * Pass a BoundaryCallback to listen to when the PagedList runs out of data to load. If this
119      * method is not called, or {@code null} is passed, you will not be notified when each
120      * DataSource runs out of data to provide to its PagedList.
121      * <p>
122      * If you are paging from a DataSource.Factory backed by local storage, you can set a
123      * BoundaryCallback to know when there is no more information to page from local storage.
124      * This is useful to page from the network when local storage is a cache of network data.
125      * <p>
126      * Note that when using a BoundaryCallback with a {@code Observable<PagedList>}, method calls
127      * on the callback may be dispatched multiple times - one for each PagedList/DataSource
128      * pair. If loading network data from a BoundaryCallback, you should prevent multiple
129      * dispatches of the same method from triggering multiple simultaneous network loads.
130      *
131      * @param boundaryCallback The boundary callback for listening to PagedList load state.
132      * @return this
133      */
134     @SuppressWarnings("unused")
135     @NonNull
setBoundaryCallback( @ullable PagedList.BoundaryCallback<Value> boundaryCallback)136     public RxPagedListBuilder<Key, Value> setBoundaryCallback(
137             @Nullable PagedList.BoundaryCallback<Value> boundaryCallback) {
138         mBoundaryCallback = boundaryCallback;
139         return this;
140     }
141 
142     /**
143      * Sets scheduler which will be used for observing new PagedLists, as well as loading updates
144      * within the PagedLists.
145      * <p>
146      * The built observable will be {@link Observable#observeOn(Scheduler) observed on} this
147      * scheduler, so that the thread receiving PagedLists will also receive the internal updates to
148      * the PagedList.
149      *
150      * @param scheduler Scheduler for background DataSource loading.
151      * @return this
152      */
setNotifyScheduler( final @NonNull Scheduler scheduler)153     public RxPagedListBuilder<Key, Value> setNotifyScheduler(
154             final @NonNull Scheduler scheduler) {
155         mNotifyScheduler = scheduler;
156         final Scheduler.Worker worker = scheduler.createWorker();
157         mNotifyExecutor = new Executor() {
158             @Override
159             public void execute(@NonNull Runnable command) {
160                 // We use a worker here since the page load notifications
161                 // should not be dispatched in parallel
162                 worker.schedule(command);
163             }
164         };
165         return this;
166     }
167 
168     /**
169      * Sets scheduler which will be used for background fetching of PagedLists, as well as on-demand
170      * fetching of pages inside.
171      *
172      * @param scheduler Scheduler for background DataSource loading.
173      * @return this
174      */
175     @SuppressWarnings({"unused", "WeakerAccess"})
176     @NonNull
setFetchScheduler( final @NonNull Scheduler scheduler)177     public RxPagedListBuilder<Key, Value> setFetchScheduler(
178             final @NonNull Scheduler scheduler) {
179         mFetchExecutor = new Executor() {
180             @Override
181             public void execute(@NonNull Runnable command) {
182                 // We use scheduleDirect since the page loads that use
183                 // executor are intentionally parallel.
184                 scheduler.scheduleDirect(command);
185             }
186         };
187         mFetchScheduler = scheduler;
188         return this;
189     }
190 
191     /**
192      * Constructs a {@code Observable<PagedList>}.
193      * <p>
194      * The returned Observable will already be observed on the
195      * {@link #setNotifyScheduler(Scheduler) notify scheduler}, and subscribed on the
196      * {@link #setFetchScheduler(Scheduler) fetch scheduler}.
197      *
198      * @return The Observable of PagedLists
199      */
200     @NonNull
buildObservable()201     public Observable<PagedList<Value>> buildObservable() {
202         if (mNotifyExecutor == null) {
203             mNotifyExecutor = ArchTaskExecutor.getMainThreadExecutor();
204             mNotifyScheduler = Schedulers.from(mNotifyExecutor);
205         }
206         if (mFetchExecutor == null) {
207             mFetchExecutor = ArchTaskExecutor.getIOThreadExecutor();
208             mFetchScheduler = Schedulers.from(mFetchExecutor);
209         }
210         return Observable.create(new PagingObservableOnSubscribe<>(
211                 mInitialLoadKey,
212                 mConfig,
213                 mBoundaryCallback,
214                 mDataSourceFactory,
215                 mNotifyExecutor,
216                 mFetchExecutor))
217                         .observeOn(mNotifyScheduler)
218                         .subscribeOn(mFetchScheduler);
219     }
220 
221     /**
222      * Constructs a {@code Flowable<PagedList>}.
223      *
224      * The returned Observable will already be observed on the
225      * {@link #setNotifyScheduler(Scheduler) notify scheduler}, and subscribed on the
226      * {@link #setFetchScheduler(Scheduler) fetch scheduler}.
227      *
228      * @param backpressureStrategy BackpressureStrategy for the Flowable to use.
229      * @return The Flowable of PagedLists
230      */
231     @NonNull
buildFlowable(BackpressureStrategy backpressureStrategy)232     public Flowable<PagedList<Value>> buildFlowable(BackpressureStrategy backpressureStrategy) {
233         return buildObservable()
234                 .toFlowable(backpressureStrategy);
235     }
236 
237     static class PagingObservableOnSubscribe<Key, Value>
238             implements ObservableOnSubscribe<PagedList<Value>>, DataSource.InvalidatedCallback,
239             Cancellable,
240             Runnable {
241 
242         @Nullable
243         private final Key mInitialLoadKey;
244         @NonNull
245         private final PagedList.Config mConfig;
246         @Nullable
247         private final PagedList.BoundaryCallback mBoundaryCallback;
248         @NonNull
249         private final DataSource.Factory<Key, Value> mDataSourceFactory;
250         @NonNull
251         private final Executor mNotifyExecutor;
252         @NonNull
253         private final Executor mFetchExecutor;
254 
255         @Nullable
256         private PagedList<Value> mList;
257         @Nullable
258         private DataSource<Key, Value> mDataSource;
259 
260         private ObservableEmitter<PagedList<Value>> mEmitter;
261 
PagingObservableOnSubscribe(@ullable Key initialLoadKey, @NonNull PagedList.Config config, @Nullable PagedList.BoundaryCallback boundaryCallback, @NonNull DataSource.Factory<Key, Value> dataSourceFactory, @NonNull Executor notifyExecutor, @NonNull Executor fetchExecutor)262         private PagingObservableOnSubscribe(@Nullable Key initialLoadKey,
263                 @NonNull PagedList.Config config,
264                 @Nullable PagedList.BoundaryCallback boundaryCallback,
265                 @NonNull DataSource.Factory<Key, Value> dataSourceFactory,
266                 @NonNull Executor notifyExecutor,
267                 @NonNull Executor fetchExecutor) {
268             mInitialLoadKey = initialLoadKey;
269             mConfig = config;
270             mBoundaryCallback = boundaryCallback;
271             mDataSourceFactory = dataSourceFactory;
272             mNotifyExecutor = notifyExecutor;
273             mFetchExecutor = fetchExecutor;
274         }
275 
276         @Override
subscribe(ObservableEmitter<PagedList<Value>> emitter)277         public void subscribe(ObservableEmitter<PagedList<Value>> emitter)
278                 throws Exception {
279             mEmitter = emitter;
280             mEmitter.setCancellable(this);
281 
282             // known that subscribe is already on fetchScheduler
283             mEmitter.onNext(createPagedList());
284         }
285 
286         @Override
cancel()287         public void cancel() throws Exception {
288             if (mDataSource != null) {
289                 mDataSource.removeInvalidatedCallback(this);
290             }
291         }
292 
293         @Override
run()294         public void run() {
295             // fetch data, run on fetchExecutor
296             mEmitter.onNext(createPagedList());
297         }
298 
299         @Override
onInvalidated()300         public void onInvalidated() {
301             if (!mEmitter.isDisposed()) {
302                 mFetchExecutor.execute(this);
303             }
304         }
305 
createPagedList()306         private PagedList<Value> createPagedList() {
307             @Nullable Key initializeKey = mInitialLoadKey;
308             if (mList != null) {
309                 //noinspection unchecked
310                 initializeKey = (Key) mList.getLastKey();
311             }
312 
313             do {
314                 if (mDataSource != null) {
315                     mDataSource.removeInvalidatedCallback(this);
316                 }
317                 mDataSource = mDataSourceFactory.create();
318                 mDataSource.addInvalidatedCallback(this);
319 
320                 mList = new PagedList.Builder<>(mDataSource, mConfig)
321                         .setNotifyExecutor(mNotifyExecutor)
322                         .setFetchExecutor(mFetchExecutor)
323                         .setBoundaryCallback(mBoundaryCallback)
324                         .setInitialKey(initializeKey)
325                         .build();
326             } while (mList.isDetached());
327             return mList;
328         }
329     }
330 }
331