Home Manual Reference Source Test

src/pool.js

import { ExponentialBackoff } from "./backoff/exponential";
import { Host } from "./host";
import * as http from "http";
import * as https from "https";
import * as querystring from "querystring";
/**
 * Status codes that will cause a host to be marked as 'failed' if we get
 * them from a request to Influx.
 * @type {Array}
 */
const resubmitErrorCodes = [
    "ETIMEDOUT",
    "ESOCKETTIMEDOUT",
    "ECONNRESET",
    "ECONNREFUSED",
    "EHOSTUNREACH",
];
/**
 * An ServiceNotAvailableError is returned as an error from requests that
 * result in a > 500 error code.
 */
export class ServiceNotAvailableError extends Error {
    constructor(message) {
        super();
        this.message = message;
        Object.setPrototypeOf(this, ServiceNotAvailableError.prototype);
    }
}
/**
 * An RequestError is returned as an error from requests that
 * result in a 300 <= error code <= 500.
 */
export class RequestError extends Error {
    constructor(req, res, body) {
        super();
        this.req = req;
        this.res = res;
        this.message = `A ${res.statusCode} ${res.statusMessage} error occurred: ${body}`;
        Object.setPrototypeOf(this, RequestError.prototype);
    }
    static Create(req, res, callback) {
        let body = "";
        res.on("data", (str) => {
            body += str.toString();
        });
        res.on("end", () => callback(new RequestError(req, res, body)));
    }
}
/**
 * Creates a function generation that returns a wrapper which only allows
 * through the first call of any function that it generated.
 */
function doOnce() {
    let handled = false;
    return (fn) => {
        return (arg) => {
            if (handled) {
                return;
            }
            handled = true;
            fn(arg);
        };
    };
}
function setToArray(itemSet) {
    const output = [];
    itemSet.forEach((value) => {
        output.push(value);
    });
    return output;
}
const request = (options, callback) => {
    if (options.protocol === "https:") {
        return https.request(options, callback);
    }
    return http.request(options, callback);
};
/**
 *
 * The Pool maintains a list available Influx hosts and dispatches requests
 * to them. If there are errors connecting to hosts, it will disable that
 * host for a period of time.
 */
export class Pool {
    /**
     * Creates a new Pool instance.
     * @param {IPoolOptions} options
     */
    constructor(options) {
        this._options = Object.assign({ backoff: new ExponentialBackoff({
                initial: 300,
                max: 10 * 1000,
                random: 1,
            }), maxRetries: 2, requestTimeout: 30 * 1000 }, options);
        this._index = 0;
        this._hostsAvailable = new Set();
        this._hostsDisabled = new Set();
        this._timeout = this._options.requestTimeout;
    }
    /**
     * Returns a list of currently active hosts.
     * @return {Host[]}
     */
    getHostsAvailable() {
        return setToArray(this._hostsAvailable);
    }
    /**
     * Returns a list of hosts that are currently disabled due to network
     * errors.
     * @return {Host[]}
     */
    getHostsDisabled() {
        return setToArray(this._hostsDisabled);
    }
    /**
     * Inserts a new host to the pool.
     */
    addHost(url, options = {}) {
        const host = new Host(url, this._options.backoff.reset(), options);
        this._hostsAvailable.add(host);
        return host;
    }
    /**
     * Returns true if there's any host available to by queried.
     * @return {Boolean}
     */
    hostIsAvailable() {
        return this._hostsAvailable.size > 0;
    }
    /**
     * Makes a request and calls back with the response, parsed as JSON.
     * An error is returned on a non-2xx status code or on a parsing exception.
     */
    json(options) {
        return this.text(options).then((res) => JSON.parse(res));
    }
    /**
     * Makes a request and resolves with the plain text response,
     * if possible. An error is raised on a non-2xx status code.
     */
    text(options) {
        return new Promise((resolve, reject) => {
            this.stream(options, (err, res) => {
                if (err) {
                    return reject(err);
                }
                let output = "";
                res.on("data", (str) => {
                    output += str.toString();
                });
                res.on("end", () => resolve(output));
            });
        });
    }
    /**
     * Makes a request and discards any response body it receives.
     * An error is returned on a non-2xx status code.
     */
    discard(options) {
        return new Promise((resolve, reject) => {
            this.stream(options, (err, res) => {
                if (err) {
                    return reject(err);
                }
                res.on("data", () => {
                    /* ignore */
                });
                res.on("end", () => resolve());
            });
        });
    }
    /**
     * Ping sends out a request to all available Influx servers, reporting on
     * their response time and version number.
     */
    ping(timeout, path = "/ping", auth = undefined) {
        const todo = [];
        setToArray(this._hostsAvailable)
            .concat(setToArray(this._hostsDisabled))
            .forEach((host) => {
            const start = Date.now();
            const url = host.url;
            const once = doOnce();
            return todo.push(new Promise((resolve) => {
                const headers = {};
                if (typeof auth !== "undefined") {
                    const encodedAuth = Buffer.from(auth).toString("base64");
                    headers["Authorization"] = `Basic ${encodedAuth}`;
                }
                const req = request(Object.assign({ hostname: url.hostname, method: "GET", path, port: Number(url.port), protocol: url.protocol, timeout, headers: headers }, host.options), once((res) => {
                    resolve({
                        url,
                        res: res.resume(),
                        online: res.statusCode < 300,
                        rtt: Date.now() - start,
                        version: res.headers["x-influxdb-version"],
                    });
                }));
                const fail = once(() => {
                    req.abort();
                    resolve({
                        online: false,
                        res: null,
                        rtt: Infinity,
                        url,
                        version: null,
                    });
                });
                // Support older Nodes and polyfills which don't allow .timeout() in
                // the request options, wrapped in a conditional for even worse
                // polyfills. See: https://github.com/node-influx/node-influx/issues/221
                if (typeof req.setTimeout === "function") {
                    req.setTimeout(timeout, () => {
                        fail.call(fail, arguments);
                    }); // Tslint:disable-line
                }
                req.on("timeout", fail);
                req.on("error", fail);
                req.end();
            }));
        });
        return Promise.all(todo);
    }
    /**
     * Makes a request and calls back with the IncomingMessage stream,
     * if possible. An error is returned on a non-2xx status code.
     */
    stream(options, callback) {
        if (!this.hostIsAvailable()) {
            return callback(new ServiceNotAvailableError("No host available"), null);
        }
        const once = doOnce();
        const host = this._getHost();
        let path = host.url.pathname === "/" ? "" : host.url.pathname;
        path += options.path;
        if (options.query) {
            path += "?" + querystring.stringify(options.query);
        }
        const req = request(Object.assign({ headers: {
                "content-length": options.body ? Buffer.from(options.body).length : 0,
            }, hostname: host.url.hostname, method: options.method, path, port: Number(host.url.port), protocol: host.url.protocol, timeout: this._timeout }, host.options), once((res) => {
            res.setEncoding("utf8");
            if (res.statusCode >= 500) {
                res.on("data", () => {
                    /* ignore */
                });
                res.on("end", () => {
                    return this._handleRequestError(new ServiceNotAvailableError(res.statusMessage), host, options, callback);
                });
                return;
            }
            if (res.statusCode >= 300) {
                return RequestError.Create(req, res, (err) => callback(err, res));
            }
            host.success();
            return callback(undefined, res);
        }));
        // Handle network or HTTP parsing errors:
        req.on("error", once((err) => {
            this._handleRequestError(err, host, options, callback);
        }));
        // Handle timeouts:
        req.on("timeout", once(() => {
            req.abort();
            this._handleRequestError(new ServiceNotAvailableError("Request timed out"), host, options, callback);
        }));
        // Support older Nodes and polyfills which don't allow .timeout() in the
        // request options, wrapped in a conditional for even worse polyfills. See:
        // https://github.com/node-influx/node-influx/issues/221
        if (typeof req.setTimeout === "function") {
            req.setTimeout(host.options.timeout || this._timeout); // Tslint:disable-line
        }
        // Write out the body:
        if (options.body) {
            req.write(options.body);
        }
        req.end();
    }
    /**
     * Returns the next available host for querying.
     * @return {Host}
     */
    _getHost() {
        const available = setToArray(this._hostsAvailable);
        const host = available[this._index];
        this._index = (this._index + 1) % available.length;
        return host;
    }
    /**
     * Re-enables the provided host, returning it to the pool to query.
     * @param  {Host} host
     */
    _enableHost(host) {
        this._hostsDisabled.delete(host);
        this._hostsAvailable.add(host);
    }
    /**
     * Disables the provided host, removing it from the query pool. It will be
     * re-enabled after a backoff interval
     */
    _disableHost(host) {
        const delay = host.fail();
        if (delay > 0) {
            this._hostsAvailable.delete(host);
            this._hostsDisabled.add(host);
            this._index %= Math.max(1, this._hostsAvailable.size);
            setTimeout(() => this._enableHost(host), delay);
        }
    }
    _handleRequestError(err, host, options, callback) {
        if (!(err instanceof ServiceNotAvailableError) &&
            !resubmitErrorCodes.includes(err.code)) {
            return callback(err, null);
        }
        this._disableHost(host);
        const retries = options.retries || 0;
        if (retries < this._options.maxRetries && this.hostIsAvailable()) {
            options.retries = retries + 1;
            return this.stream(options, callback);
        }
        callback(err, null);
    }
}