1 /* 2 * Copyright 2018 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package androidx.paging; 18 19 import androidx.annotation.NonNull; 20 import androidx.annotation.Nullable; 21 import androidx.arch.core.executor.ArchTaskExecutor; 22 23 import java.util.concurrent.Executor; 24 25 import io.reactivex.BackpressureStrategy; 26 import io.reactivex.Flowable; 27 import io.reactivex.Observable; 28 import io.reactivex.ObservableEmitter; 29 import io.reactivex.ObservableOnSubscribe; 30 import io.reactivex.Scheduler; 31 import io.reactivex.functions.Cancellable; 32 import io.reactivex.schedulers.Schedulers; 33 34 /** 35 * Builder for {@code Observable<PagedList>} or {@code Flowable<PagedList>}, given a 36 * {@link DataSource.Factory} and a {@link PagedList.Config}. 37 * <p> 38 * The required parameters are in the constructor, so you can simply construct and build, or 39 * optionally enable extra features (such as initial load key, or BoundaryCallback). 40 * <p> 41 * The returned observable/flowable will already be subscribed on the 42 * {@link #setFetchScheduler(Scheduler)}, and will perform all loading on that scheduler. It will 43 * already be observed on {@link #setNotifyScheduler(Scheduler)}, and will dispatch new PagedLists, 44 * as well as their updates to that scheduler. 45 * 46 * @param <Key> Type of input valued used to load data from the DataSource. Must be integer if 47 * you're using PositionalDataSource. 48 * @param <Value> Item type being presented. 49 */ 50 public final class RxPagedListBuilder<Key, Value> { 51 private Key mInitialLoadKey; 52 private PagedList.Config mConfig; 53 private DataSource.Factory<Key, Value> mDataSourceFactory; 54 private PagedList.BoundaryCallback mBoundaryCallback; 55 private Executor mNotifyExecutor; 56 private Executor mFetchExecutor; 57 private Scheduler mFetchScheduler; 58 private Scheduler mNotifyScheduler; 59 60 /** 61 * Creates a RxPagedListBuilder with required parameters. 62 * 63 * @param dataSourceFactory DataSource factory providing DataSource generations. 64 * @param config Paging configuration. 65 */ RxPagedListBuilder(@onNull DataSource.Factory<Key, Value> dataSourceFactory, @NonNull PagedList.Config config)66 public RxPagedListBuilder(@NonNull DataSource.Factory<Key, Value> dataSourceFactory, 67 @NonNull PagedList.Config config) { 68 //noinspection ConstantConditions 69 if (config == null) { 70 throw new IllegalArgumentException("PagedList.Config must be provided"); 71 } 72 //noinspection ConstantConditions 73 if (dataSourceFactory == null) { 74 throw new IllegalArgumentException("DataSource.Factory must be provided"); 75 } 76 mDataSourceFactory = dataSourceFactory; 77 mConfig = config; 78 } 79 80 /** 81 * Creates a RxPagedListBuilder with required parameters. 82 * <p> 83 * This method is a convenience for: 84 * <pre> 85 * RxPagedListBuilder(dataSourceFactory, 86 * new PagedList.Config.Builder().setPageSize(pageSize).build()) 87 * </pre> 88 * 89 * @param dataSourceFactory DataSource.Factory providing DataSource generations. 90 * @param pageSize Size of pages to load. 91 */ 92 @SuppressWarnings("unused") RxPagedListBuilder(@onNull DataSource.Factory<Key, Value> dataSourceFactory, int pageSize)93 public RxPagedListBuilder(@NonNull DataSource.Factory<Key, Value> dataSourceFactory, 94 int pageSize) { 95 this(dataSourceFactory, new PagedList.Config.Builder().setPageSize(pageSize).build()); 96 } 97 98 /** 99 * First loading key passed to the first PagedList/DataSource. 100 * <p> 101 * When a new PagedList/DataSource pair is created after the first, it acquires a load key from 102 * the previous generation so that data is loaded around the position already being observed. 103 * 104 * @param key Initial load key passed to the first PagedList/DataSource. 105 * @return this 106 */ 107 @SuppressWarnings("unused") 108 @NonNull setInitialLoadKey(@ullable Key key)109 public RxPagedListBuilder<Key, Value> setInitialLoadKey(@Nullable Key key) { 110 mInitialLoadKey = key; 111 return this; 112 } 113 114 /** 115 * Sets a {@link PagedList.BoundaryCallback} on each PagedList created, typically used to load 116 * additional data from network when paging from local storage. 117 * <p> 118 * Pass a BoundaryCallback to listen to when the PagedList runs out of data to load. If this 119 * method is not called, or {@code null} is passed, you will not be notified when each 120 * DataSource runs out of data to provide to its PagedList. 121 * <p> 122 * If you are paging from a DataSource.Factory backed by local storage, you can set a 123 * BoundaryCallback to know when there is no more information to page from local storage. 124 * This is useful to page from the network when local storage is a cache of network data. 125 * <p> 126 * Note that when using a BoundaryCallback with a {@code Observable<PagedList>}, method calls 127 * on the callback may be dispatched multiple times - one for each PagedList/DataSource 128 * pair. If loading network data from a BoundaryCallback, you should prevent multiple 129 * dispatches of the same method from triggering multiple simultaneous network loads. 130 * 131 * @param boundaryCallback The boundary callback for listening to PagedList load state. 132 * @return this 133 */ 134 @SuppressWarnings("unused") 135 @NonNull setBoundaryCallback( @ullable PagedList.BoundaryCallback<Value> boundaryCallback)136 public RxPagedListBuilder<Key, Value> setBoundaryCallback( 137 @Nullable PagedList.BoundaryCallback<Value> boundaryCallback) { 138 mBoundaryCallback = boundaryCallback; 139 return this; 140 } 141 142 /** 143 * Sets scheduler which will be used for observing new PagedLists, as well as loading updates 144 * within the PagedLists. 145 * <p> 146 * The built observable will be {@link Observable#observeOn(Scheduler) observed on} this 147 * scheduler, so that the thread receiving PagedLists will also receive the internal updates to 148 * the PagedList. 149 * 150 * @param scheduler Scheduler for background DataSource loading. 151 * @return this 152 */ setNotifyScheduler( final @NonNull Scheduler scheduler)153 public RxPagedListBuilder<Key, Value> setNotifyScheduler( 154 final @NonNull Scheduler scheduler) { 155 mNotifyScheduler = scheduler; 156 final Scheduler.Worker worker = scheduler.createWorker(); 157 mNotifyExecutor = new Executor() { 158 @Override 159 public void execute(@NonNull Runnable command) { 160 // We use a worker here since the page load notifications 161 // should not be dispatched in parallel 162 worker.schedule(command); 163 } 164 }; 165 return this; 166 } 167 168 /** 169 * Sets scheduler which will be used for background fetching of PagedLists, as well as on-demand 170 * fetching of pages inside. 171 * 172 * @param scheduler Scheduler for background DataSource loading. 173 * @return this 174 */ 175 @SuppressWarnings({"unused", "WeakerAccess"}) 176 @NonNull setFetchScheduler( final @NonNull Scheduler scheduler)177 public RxPagedListBuilder<Key, Value> setFetchScheduler( 178 final @NonNull Scheduler scheduler) { 179 mFetchExecutor = new Executor() { 180 @Override 181 public void execute(@NonNull Runnable command) { 182 // We use scheduleDirect since the page loads that use 183 // executor are intentionally parallel. 184 scheduler.scheduleDirect(command); 185 } 186 }; 187 mFetchScheduler = scheduler; 188 return this; 189 } 190 191 /** 192 * Constructs a {@code Observable<PagedList>}. 193 * <p> 194 * The returned Observable will already be observed on the 195 * {@link #setNotifyScheduler(Scheduler) notify scheduler}, and subscribed on the 196 * {@link #setFetchScheduler(Scheduler) fetch scheduler}. 197 * 198 * @return The Observable of PagedLists 199 */ 200 @NonNull buildObservable()201 public Observable<PagedList<Value>> buildObservable() { 202 if (mNotifyExecutor == null) { 203 mNotifyExecutor = ArchTaskExecutor.getMainThreadExecutor(); 204 mNotifyScheduler = Schedulers.from(mNotifyExecutor); 205 } 206 if (mFetchExecutor == null) { 207 mFetchExecutor = ArchTaskExecutor.getIOThreadExecutor(); 208 mFetchScheduler = Schedulers.from(mFetchExecutor); 209 } 210 return Observable.create(new PagingObservableOnSubscribe<>( 211 mInitialLoadKey, 212 mConfig, 213 mBoundaryCallback, 214 mDataSourceFactory, 215 mNotifyExecutor, 216 mFetchExecutor)) 217 .observeOn(mNotifyScheduler) 218 .subscribeOn(mFetchScheduler); 219 } 220 221 /** 222 * Constructs a {@code Flowable<PagedList>}. 223 * 224 * The returned Observable will already be observed on the 225 * {@link #setNotifyScheduler(Scheduler) notify scheduler}, and subscribed on the 226 * {@link #setFetchScheduler(Scheduler) fetch scheduler}. 227 * 228 * @param backpressureStrategy BackpressureStrategy for the Flowable to use. 229 * @return The Flowable of PagedLists 230 */ 231 @NonNull buildFlowable(BackpressureStrategy backpressureStrategy)232 public Flowable<PagedList<Value>> buildFlowable(BackpressureStrategy backpressureStrategy) { 233 return buildObservable() 234 .toFlowable(backpressureStrategy); 235 } 236 237 static class PagingObservableOnSubscribe<Key, Value> 238 implements ObservableOnSubscribe<PagedList<Value>>, DataSource.InvalidatedCallback, 239 Cancellable, 240 Runnable { 241 242 @Nullable 243 private final Key mInitialLoadKey; 244 @NonNull 245 private final PagedList.Config mConfig; 246 @Nullable 247 private final PagedList.BoundaryCallback mBoundaryCallback; 248 @NonNull 249 private final DataSource.Factory<Key, Value> mDataSourceFactory; 250 @NonNull 251 private final Executor mNotifyExecutor; 252 @NonNull 253 private final Executor mFetchExecutor; 254 255 @Nullable 256 private PagedList<Value> mList; 257 @Nullable 258 private DataSource<Key, Value> mDataSource; 259 260 private ObservableEmitter<PagedList<Value>> mEmitter; 261 PagingObservableOnSubscribe(@ullable Key initialLoadKey, @NonNull PagedList.Config config, @Nullable PagedList.BoundaryCallback boundaryCallback, @NonNull DataSource.Factory<Key, Value> dataSourceFactory, @NonNull Executor notifyExecutor, @NonNull Executor fetchExecutor)262 private PagingObservableOnSubscribe(@Nullable Key initialLoadKey, 263 @NonNull PagedList.Config config, 264 @Nullable PagedList.BoundaryCallback boundaryCallback, 265 @NonNull DataSource.Factory<Key, Value> dataSourceFactory, 266 @NonNull Executor notifyExecutor, 267 @NonNull Executor fetchExecutor) { 268 mInitialLoadKey = initialLoadKey; 269 mConfig = config; 270 mBoundaryCallback = boundaryCallback; 271 mDataSourceFactory = dataSourceFactory; 272 mNotifyExecutor = notifyExecutor; 273 mFetchExecutor = fetchExecutor; 274 } 275 276 @Override subscribe(ObservableEmitter<PagedList<Value>> emitter)277 public void subscribe(ObservableEmitter<PagedList<Value>> emitter) 278 throws Exception { 279 mEmitter = emitter; 280 mEmitter.setCancellable(this); 281 282 // known that subscribe is already on fetchScheduler 283 mEmitter.onNext(createPagedList()); 284 } 285 286 @Override cancel()287 public void cancel() throws Exception { 288 if (mDataSource != null) { 289 mDataSource.removeInvalidatedCallback(this); 290 } 291 } 292 293 @Override run()294 public void run() { 295 // fetch data, run on fetchExecutor 296 mEmitter.onNext(createPagedList()); 297 } 298 299 @Override onInvalidated()300 public void onInvalidated() { 301 if (!mEmitter.isDisposed()) { 302 mFetchExecutor.execute(this); 303 } 304 } 305 createPagedList()306 private PagedList<Value> createPagedList() { 307 @Nullable Key initializeKey = mInitialLoadKey; 308 if (mList != null) { 309 //noinspection unchecked 310 initializeKey = (Key) mList.getLastKey(); 311 } 312 313 do { 314 if (mDataSource != null) { 315 mDataSource.removeInvalidatedCallback(this); 316 } 317 mDataSource = mDataSourceFactory.create(); 318 mDataSource.addInvalidatedCallback(this); 319 320 mList = new PagedList.Builder<>(mDataSource, mConfig) 321 .setNotifyExecutor(mNotifyExecutor) 322 .setFetchExecutor(mFetchExecutor) 323 .setBoundaryCallback(mBoundaryCallback) 324 .setInitialKey(initializeKey) 325 .build(); 326 } while (mList.isDetached()); 327 return mList; 328 } 329 } 330 } 331