/**
 *  USAGE EXAMPLE:
 *
 *  export default class TopicGateway extends MqttGateway {
 *
 *    // an error will be thrown if these don't exist
 *    get version() { return '1'; }
 *    get topicPattern() { return 'customers/+/conversations/+'; }
 *
 *    // required, an error will be thrown if this doesn't exist
 *    // receives updates or additions of a _single_ entity
 *    onReceive(entityDto, topic, parsedTopicParams, correlationId) {
 *      this.notifyObservers('handleEntity', entityDto);
 *    }
 *
 *    // required for fetch, an error will be thrown if `_fetch` is called and this method doesn't exist
 *    // the `fetchResultDto` argument contains an array of entity DTOs in case of a collection subscription
 *    // like `agents/+`, OR a single entity DTO in case of a single entity subscription like `agent/123`
 *    onFetchResponse(fetchResultDto, topic, parsedTopicParams, correlationId) {
 *      this.notifyObservers('handleRequestedEntities', fetchResultDto);
 *    }
 *
 *    // required for fetch, an error will be thrown if `_fetch` is called and this method doesn't exist
 *    onFetchError(errorDto, topic, parsedTopicParams, correlationId) {
 *      this.notifyObservers('handleFetchError', errorDto);
 *    }
 *
 *    // optional, if defined, the gateway will subscribe to delete commands
 *    onDelete(topic, parsedTopicParams, correlationId) {
 *      this.notifyObservers('handleDelete', parsedTopicParams);
 *    }
 *
 *    add(customerId, payload) {
 *      // Do custom work here if needed
 *
 *      super._add([customerId], payload);
 *        // pub v1/requestor/<requestorId>/customer/<customerId>/conversations/command/add
 *    }
 *
 *    // optional, if defined the gateway will subscribe to the corresponding response topic
 *    on(Delete|Update)Success(payload, topic, parsedTopicParams, correlationId) {
 *      var entityId = _.last(parsedTopicParams);
 *      this.notifyObservers('handle(Delete|Update)Success', entityId, payload);
 *    }
 *
 *    // optional, if defined the gateway subscribe to the corresponding response topic
 *    on(Delete|Update)Errors(errorDtos, topic, parsedTopicParams, correlationId) {
 *      var entityId = _.last(parsedTopicParams);
 *      this.notifyObservers('handle(Delete|Update)Errors', entityId, errorDtos);
 *    }
 *
 *    // Other available actions that can be overridden:
 *    _update: function([customerId], payload)
 *      // pub v1/requestor/<requestorId>/customer/<customerId>/conversations/command/update
 *    _delete: function([customerId], payload)
 *      // pub v1/requestor/<requestorId>/customer/<customerId>/conversations/command/delete
 *
 *    // available methods that can be overridden:
 *    baseTopicPattern: function()
 *      // override if the base topic pattern must follow a different structure
 *      // for example, if the orgId is not required for the topic pattern
 *    createTopicPattern: function(topicPattern)
 *      // override to customize parameterization of a topic pattern
 *  }
 *
 *  // AND.. the following methods will be added:
 *     (where the first argument is always an array of parameters to build the topic)
 *  {
 *    _subscribeAndFetch: function([customerId])
 *      // sub v1/customer/<customerId>/conversations/+
 *      // sub v1/customer/<customerId>/conversations/+/command/delete
 *      // sub v1/requestor/<requestorId>/customer/+/conversations
 *      // pub v1/requestor/<requestorId>/customer/<customerId>/conversations/command/fetch
 *    _subscribe: function([customerId])
 *      // sub v1/customer/<customerId>/conversations/+
 *      // sub v1/customer/<customerId>/conversations/+/command/delete
 *    _unsubscribe: function([customerId])
 *      // unsub v1/company/<companyId>/customer/<customerId>/conversations/+
 *      // unsub v1/company/<companyId>/customer/<customerId>/conversations/+/command/delete
 *      // ... and if a fetch is in progress it will also be unsubscribed from
 *
 *    // fetch, despite being an "command" is always included
 *    // and the fetch command is special because it requires a requestorId
 *    // to achieve a request/response flow over pub sub
 *    _fetch: function([customerId], payload)
 *      // sub v1/requestor/<requestorId>/customer/+/conversations
 *      // pub v1/requestor/<requestorId>/customer/<customerId>/conversations/command/fetch
 *  }
 */

import _ from 'lodash';

import IdGenerator from 'scripts/domain/contracts/id_generator';
import Observable from 'scripts/lib/observable_mixin';

export default class MqttGateway {
  constructor(backend, requestorId) {
    if (!backend) {
      throw new Error(`the ${this.constructor.name} constructor requires a backend to be passed in`);
    }

    if (!requestorId) {
      throw new Error(`the ${this.constructor.name} constructor requires a requestorId to be passed in`);
    }

    this._backend = backend;
    this._outstandingSubscribes = new Map();
    this._outstandingFetchCorrelationIds = new Set();
    this._canceledFetchCorrelationIds = new Set();
    this._responseHandlers = {};

    this._broadcastTopicPattern = this.constructor.createTopicPattern(`v${this.version}/${this.baseTopicPattern}`);
    this._requestorTopicPattern = this.constructor.createTopicPattern(
      `v${this.version}/requestor/${requestorId}/${this.baseTopicPattern}`
    );

    _.bindAll(this, [
      'onDeleteSubscriber',
      'onReceiveSubscriber',
      'onResponseSubscriber',
      'reconnectHandler',
      'resetHandler',
    ]);
  }

  static createTopicPattern(topicPattern) {
    return new MqttTopicPattern(topicPattern);
  }

  init({ orgId, agentId } = {}) {
    this._broadcastTopicPattern.orgId = orgId;
    this._requestorTopicPattern.orgId = orgId;
    this._agentId = agentId;
  }

  resetHandler() {
    this._outstandingSubscribes.clear();
    this._outstandingFetchCorrelationIds.clear();
    this._canceledFetchCorrelationIds.clear();
    this._responseHandlers = {};
  }

  reconnectHandler() {
    this._responseHandlers = {};
    this._outstandingSubscribes.forEach(params => this._fetch(params));
  }

  get baseTopicPattern() {
    return `orgs/+/${this.topicPattern}`;
  }

  get version() {
    throw new Error('version getter/property is not implemented');
  }

  get topicPattern() {
    throw new Error('topicPattern getter/property is not implemented');
  }

  onReceive() {
    throw new Error('onReceive function is not implemented');
  }

  _onFetchResponse(payload, topic, topicParams, correlationId) {
    this._decorrelateFetchResponse(correlationId, () => {
      this.onFetchResponse(payload, topic, topicParams, correlationId);
    });
  }

  _onFetchError(payload, topic, topicParams, correlationId) {
    this._decorrelateFetchResponse(correlationId, () => {
      this.onFetchError(payload, topic, topicParams, correlationId);
    });
  }

  _decorrelateFetchResponse(correlationId, callback) {
    this._outstandingFetchCorrelationIds.delete(correlationId);

    if (this._canceledFetchCorrelationIds.has(correlationId)) {
      this._canceledFetchCorrelationIds.delete(correlationId);
    } else {
      callback();
    }
  }

  onFetchResponse() {
    throw new Error('onFetchResponse function is not implemented');
  }

  onFetchError() {
    throw new Error('onFetchError function is not implemented');
  }

  _broadcast(topicParams, payload) {
    const correlationId = IdGenerator.newId();
    this._publish(this._broadcastTopicPattern.interpolate(topicParams), { correlationId, payload });
  }

  _add(topicParams, payload, correlationId) {
    return this._command('add', topicParams, payload, {
      correlationId,
      onSuccess: this.onAddSuccess,
      onError: this.onAddErrors,
    });
  }

  _delete(topicParams) {
    return this._command('delete', topicParams, null, {
      onSuccess: this.onDeleteSuccess,
      onError: this.onDeleteErrors,
    });
  }

  _update(topicParams, payload) {
    return this._command('update', topicParams, payload, {
      onSuccess: this.onUpdateSuccess,
      onError: this.onUpdateErrors,
    });
  }

  _publishCommand(command, topicParams, envelope) {
    let publishTopic = this._requestorTopicPattern.interpolate(topicParams);
    if (command === 'add' || command === 'fetch') {
      // replace a trailing `/+` to support proper generation of the add
      // and fetch topics
      //
      // e.g. the agents gateway topic is `/agents/+`, but agents are added
      // using the topic `/agents/command/add` and fetched using the topic
      // `/agents/command/fetch`
      publishTopic = publishTopic.replace(/\/\+$/, '');
    }
    this._publish(`${publishTopic}/command/${command}`, envelope);
  }

  onReceiveSubscriber(envelope, topic) {
    this.onReceive(envelope.payload, topic, this._broadcastTopicPattern.extractParams(topic), envelope.correlationId);
  }

  onDeleteSubscriber(envelope, topic) {
    this.onDelete(topic, this._broadcastTopicPattern.extractParams(topic), envelope.correlationId);
  }

  _baseSubscriptions(topicParams) {
    let baseSubscriptions = { onReceiveSubscriber: this._broadcastTopicPattern.interpolate(topicParams) };
    if (_.isFunction(this.onDelete)) {
      baseSubscriptions.onDeleteSubscriber = `${this._broadcastTopicPattern.interpolate(topicParams)}/event/delete`;
    }
    return baseSubscriptions;
  }

  // pass _baseSubscriptions or _baseFetchSubscriptions and a this._backend.{un}subscribe
  _mapSubscriptions(subscriptions, func) {
    _.forEach(subscriptions, (topic, handlerName) => {
      func(topic, this[handlerName]);
    });
  }

  _publish(topic, payload) {
    if (/\+/.test(topic)) {
      throw new Error(`publish to topic [${topic}] missing param`);
    }
    this._backend.publish(topic, ..._.tail(arguments));
  }

  _subscribe(topicParams) {
    this._mapSubscriptions(this._baseSubscriptions(topicParams), this._backend.subscribe.bind(this._backend));
  }

  _subscribeAndFetch(topicParams) {
    this._outstandingSubscribes.set(getTopicParamKey(topicParams), topicParams);
    this._subscribe(topicParams);
    return this._fetch(topicParams);
  }

  _unsubscribe(topicParams) {
    this._outstandingSubscribes.delete(getTopicParamKey(topicParams));
    this._mapSubscriptions(this._baseSubscriptions(topicParams), this._backend.unsubscribe.bind(this._backend));
  }

  _fetch(topicParams, payload, correlationId) {
    let req = this._command('fetch', topicParams, payload, {
      correlationId,
      onSuccess: this._onFetchResponse,
      onError: this._onFetchError,
    });
    this._outstandingFetchCorrelationIds.add(req.correlationId);
    return req;
  }

  _command(command, topicParams, payload, { correlationId, onSuccess, onError } = {}) {
    correlationId = correlationId || IdGenerator.newId();

    // subscribe to the response topic before issuing the command
    this._commandSubscribe(topicParams, { correlationId, onSuccess, onError }, () => {
      // issue the command
      this._publishCommand(command, topicParams, wrapWithEnvelope(payload, correlationId));
    });

    return { correlationId };
  }

  _commandSubscribe(topicParams, { correlationId, onSuccess, onError }, callback) {
    // if there is a response handler, subscribe to the response topic
    if (onSuccess || onError) {
      // store the response handler for use in onResponseSubscriber
      this._responseHandlers[correlationId] = { success: onSuccess, error: onError };

      // create the generic response topic
      let responseTopic = this._createResponseTopic(this._requestorTopicPattern, topicParams);

      // subscribe to the generic response topic and call the callback when subscribed
      this._backend.subscribe(responseTopic, this.onResponseSubscriber, callback);
    } else {
      // there are no response handlers, call the callback immediately
      callback();
    }
  }

  _createResponseTopic(topicPattern, topicParams) {
    // subscribe to the wildcard response topic, so we only have one
    // subscription for all request/responses pairs
    const wildcardTopicParams = (topicParams || []).map(topicParam => {
      // if the topic parameter is the current agent id, then don't make it a
      // wildcard - as it may be required for broker authorization checks
      // (this is so horrible)
      return topicParam === this._agentId ? this._agentId : '+';
    });

    // replace an unspecified trailing + with a /, since the response topic
    // for /agents/+ is /agents we transform it into /agents// and then remove
    // all the trailing slashes (this is even more horrible)
    const responseTopicParams = wildcardTopicParams.concat('/');

    return topicPattern.interpolate(responseTopicParams).replace(/\/+$/, '');
  }

  // onResponseSubscriber handles the response for a request/response command
  onResponseSubscriber(envelope, topic) {
    let handlers = this._responseHandlers[envelope.correlationId];
    delete this._responseHandlers[envelope.correlationId];

    let callback = handlers && handlers[envelope.status];
    if (callback != null) {
      let topicParams = this._requestorTopicPattern.extractParams(topic);
      callback.call(this, envelope.payload, topic, topicParams, envelope.correlationId);
    }
  }

  // cancels all outstanding fetches
  cancelFetches() {
    this._outstandingFetchCorrelationIds.forEach(correlationId => {
      this._canceledFetchCorrelationIds.add(correlationId);
    });
  }
}

_.extend(MqttGateway.prototype, Observable);

export function wrapWithEnvelope(payload, correlationId) {
  const message = { correlationId: correlationId || IdGenerator.newId() };
  if (payload != null) {
    message.payload = payload;
  }

  return message;
}

function getTopicParamKey(topicParams) {
  return (topicParams && topicParams.join(' ')) || '_no_param';
}

const FIXED_PARAM_COUNT = 1; // :orgId

class MqttTopicPattern {
  constructor(patternString) {
    this._patternString = patternString;
  }

  // patternString is: something/+/hello/+
  // orgId is: exampleOrgId
  // pre: orgs/exampleOrgId/something/23/hello/1
  // post: ['23', '1']
  // pre: orgs/exampleOrgId/something/23/hello
  // post: ['23']
  // pre: orgs/exampleOrgId/something/23/hello/command/add/error
  // post: ['23']
  extractParams(topicString) {
    // convert MQTT topic pattern to a regex
    let topicRegex = this._patternString
      .replace(/\+/g, '([^/]+)') // match on all +
      .replace(/\/\(\[\^\/\]\+\)$/, '(?:/([^/]+))?'); // convert (what was) a trailing + to an optional match

    let matches = topicString.match(topicRegex);

    if (matches === null) {
      throw new Error(`expected [${JSON.stringify(topicRegex)}] to match [${JSON.stringify(topicString)}]`);
    }

    if (_.last(matches) === 'command') {
      return matches.slice(1 + FIXED_PARAM_COUNT, -1);
    }

    return _.compact(matches.slice(1 + FIXED_PARAM_COUNT));
  }

  // patternString is: something/+/hello/+
  // orgId is: exampleOrgId
  //  pre: topicParams are: [ 1, 2 ]
  //  post: orgs/exampleOrgId/something/1/hello/2
  //  pre: topicParams are: [ 1 ]
  //  post: orgs/exampleOrgId/something/1/hello/+
  //  pre: topicParams are: [ ]
  //  post: something/+/hello/+
  interpolate(topicParams) {
    let params = this._withOrgId(topicParams);

    if (params.some(p => p == null)) {
      throw new Error(`${this._patternString} missing param ${JSON.stringify(params)}`);
    }

    return this._patternString.replace(/\+/g, () => params.shift() || '+');
  }

  _withOrgId(topicParams) {
    if (!this.orgId) {
      throw new Error('orgId not set');
    }

    return [this.orgId].concat(topicParams || []);
  }
}
