import { Observable } from '../../Observable'; import { Subscription } from '../../Subscription'; import { from } from '../../observable/from'; import { ObservableInput } from '../../types'; export function fromFetch( input: string | Request, init: RequestInit & { selector: (response: Response) => ObservableInput } ): Observable; export function fromFetch( input: string | Request, init?: RequestInit ): Observable; /** * Uses [the Fetch API](https://developer.mozilla.org/en-US/docs/Web/API/Fetch_API) to * make an HTTP request. * * **WARNING** Parts of the fetch API are still experimental. `AbortController` is * required for this implementation to work and use cancellation appropriately. * * Will automatically set up an internal [AbortController](https://developer.mozilla.org/en-US/docs/Web/API/AbortController) * in order to teardown the internal `fetch` when the subscription tears down. * * If a `signal` is provided via the `init` argument, it will behave like it usually does with * `fetch`. If the provided `signal` aborts, the error that `fetch` normally rejects with * in that scenario will be emitted as an error from the observable. * * ### Basic Use * * ```ts * import { of } from 'rxjs'; * import { fromFetch } from 'rxjs/fetch'; * import { switchMap, catchError } from 'rxjs/operators'; * * const data$ = fromFetch('https://api.github.com/users?per_page=5').pipe( * switchMap(response => { * if (response.ok) { * // OK return data * return response.json(); * } else { * // Server is returning a status requiring the client to try something else. * return of({ error: true, message: `Error ${response.status}` }); * } * }), * catchError(err => { * // Network or other error, handle appropriately * console.error(err); * return of({ error: true, message: err.message }) * }) * ); * * data$.subscribe({ * next: result => console.log(result), * complete: () => console.log('done') * }); * ``` * * ### Use with Chunked Transfer Encoding * * With HTTP responses that use [chunked transfer encoding](https://tools.ietf.org/html/rfc7230#section-3.3.1), * the promise returned by `fetch` will resolve as soon as the response's headers are * received. * * That means the `fromFetch` observable will emit a `Response` - and will * then complete - before the body is received. When one of the methods on the * `Response` - like `text()` or `json()` - is called, the returned promise will not * resolve until the entire body has been received. Unsubscribing from any observable * that uses the promise as an observable input will not abort the request. * * To facilitate aborting the retrieval of responses that use chunked transfer encoding, * a `selector` can be specified via the `init` parameter: * * ```ts * import { of } from 'rxjs'; * import { fromFetch } from 'rxjs/fetch'; * * const data$ = fromFetch('https://api.github.com/users?per_page=5', { * selector: response => response.json() * }); * * data$.subscribe({ * next: result => console.log(result), * complete: () => console.log('done') * }); * ``` * * @param input The resource you would like to fetch. Can be a url or a request object. * @param init A configuration object for the fetch. * [See MDN for more details](https://developer.mozilla.org/en-US/docs/Web/API/WindowOrWorkerGlobalScope/fetch#Parameters) * @returns An Observable, that when subscribed to performs an HTTP request using the native `fetch` * function. The {@link Subscription} is tied to an `AbortController` for the the fetch. */ export function fromFetch( input: string | Request, initWithSelector: RequestInit & { selector?: (response: Response) => ObservableInput } = {} ): Observable { const { selector, ...init } = initWithSelector; return new Observable(subscriber => { const controller = new AbortController(); const signal = controller.signal; let abortable = true; let unsubscribed = false; const subscription = new Subscription(); subscription.add(() => { unsubscribed = true; if (abortable) { controller.abort(); } }); let perSubscriberInit: RequestInit; if (init) { // If a signal is provided, just have it teardown. It's a cancellation token, basically. if (init.signal) { if (init.signal.aborted) { controller.abort(); } else { const outerSignal = init.signal; const outerSignalHandler = () => { if (!signal.aborted) { controller.abort(); } }; outerSignal.addEventListener('abort', outerSignalHandler); subscription.add(() => outerSignal.removeEventListener('abort', outerSignalHandler)); } } // init cannot be mutated or reassigned as it's closed over by the // subscriber callback and is shared between subscribers. perSubscriberInit = { ...init, signal }; } else { perSubscriberInit = { signal }; } fetch(input, perSubscriberInit).then(response => { if (selector) { subscription.add(from(selector(response)).subscribe( value => subscriber.next(value), err => { abortable = false; if (!unsubscribed) { // Only forward the error if it wasn't an abort. subscriber.error(err); } }, () => { abortable = false; subscriber.complete(); } )); } else { abortable = false; subscriber.next(response); subscriber.complete(); } }).catch(err => { abortable = false; if (!unsubscribed) { // Only forward the error if it wasn't an abort. subscriber.error(err); } }); return subscription; }); }