import { Injectable } from '@angular/core';
import { StorageService } from '@service/common/storage.service';
import { environment } from 'environments/environment';
import { IClientPublishOptions } from 'mqtt';
import { IMqttMessage, IMqttServiceOptions, MqttConnectionState, MqttService } from 'ngx-mqtt';
import {
  BehaviorSubject,
  Observable,
  Subscription,
  combineLatestWith,
  distinctUntilChanged,
  filter,
  map,
  of,
  switchMap,
  tap,
} from 'rxjs';
import { AvailableAPI, RequestMethod, UseHeaderType } from '@class/commons/request-api.model';
import { DateTime } from 'luxon';
import { ApiWrapper } from '@service/common/api-wrapper.service';
import { AppCredentialsApi } from '@custom-types/core/app-credentials-api.model';

const MQTT_CONNECT_AUTH_PARAM = 'token';

/**
 * Why we have this service?
 * We need to renew the mqtt connection when we get the new access token after it got expired
 * means we need to disconnect the existing connection over wss and opens a new one with valid token
 * when we disconnect the existing connection, it clears the all the subscriptions we have on that connection
 * it does not auto renews the subscription on force disconnect, it only does on reconnect and we don't have the capability
 *
 * in the ngx-mqtt lib that we are using
 * so to overcome that problem, what this service will be doing is
 * is create a wrapper around the ngx-mqtt lib
 * it'll keep the list of subscriptions and will expose the observable only
 * and if there is a renew connection process
 * it'll just renew the already subscribe topics under the hood so the rest of the application
 * works/behave as expected
 *
 * this renew of the subs should happen like super fast
 * but there are slight chances that we will miss the msgs during this process
 * if a msg gets published during this process
 *
 * there are event emitters from the ngx-mqtt lib
 * like onConnect, onDisconnect, etc.
 * we could have used that
 * but if a connection gets dropped auto, it'll reconnect it again, and it'll renew the subscriptions auto
 * and it'll emit those events onConnect etc.
 * but we only want to do this when we get a new token so we can close the existing connection
 * and opens a new one with valid token
 */
@Injectable({
  providedIn: 'root',
})
export class NgxMqttWrapperService {
  // keep the list of subscriptions that needed to be renewed
  private _listOfSubs: {
    [index: string]: Subscription;
  } = {};

  // list of behavior subjects and observable to be treated as
  // store kinda stuff
  private _listOfBs: {
    [index: string]: BehaviorSubject<IMqttMessage>;
  } = {};
  observables: {
    [filterString: string]: Observable<IMqttMessage>;
  } = {};

  #connectionState$ = this._mqttService.state.asObservable();

  #connected$ = this.#connectionState$.pipe(
    distinctUntilChanged(),
    map((state) => state !== MqttConnectionState.CLOSED),
  );

  constructor(
    private _mqttService: MqttService,
    private _storageService: StorageService,
    private _apiWrapper: ApiWrapper,
  ) {
    const authTokenAndCredentials$ = this._storageService.authToken$.pipe(
      combineLatestWith(this._storageService.rabbitMQWebSocketCredentials$),
      filter(([authToken, credentials]) => !!authToken && !!credentials),
    );

    authTokenAndCredentials$
      .pipe(
        filter(([, credentials]) => credentials.fresh),
        switchMap(() => this.getRabbitMQUsernamePassword()),
        tap((credentials) => {
          this._storageService.setAppApiKeysTokensCredentials({
            appCredentials: credentials,
            expires: DateTime.now().plus({ days: 1 }).toMillis(),
          });
        }),
      )
      .subscribe();

    authTokenAndCredentials$
      .pipe(
        combineLatestWith(this.#connected$),
        distinctUntilChanged((a, b) => {
          // we only want to connect to mqtt when we get a new token
          // or we have new credentials
          return a[0][0] === b[0][0] && a[0][1] === b[0][1];
        }),
        filter(([[, credentials]]) => !credentials.fresh),
        tap(([[authToken, mqttCredentials], connected]) => {
          this.connectMqttWebSocket({
            authToken,
            username: mqttCredentials.username,
            password: mqttCredentials.password,
            connected,
          });
        }),
      )
      .subscribe();
  }

  observe(topic: string): Observable<IMqttMessage> {
    if (!this._listOfBs[topic]) {
      this._listOfBs[topic] = new BehaviorSubject(null as IMqttMessage);
      this.observables[topic] = this._listOfBs[topic].asObservable();
    }
    this._listOfSubs[topic]?.unsubscribe();
    this._listOfSubs[topic] = this._mqttService.observe(topic).subscribe((msg) => {
      this._listOfBs[topic].next(msg);
    });
    return this.observables[topic].pipe(filter((msg) => msg !== null));
  }

  publish(topic: string, message: string | Buffer, options?: IClientPublishOptions): Observable<void> {
    return this._mqttService.publish(topic, message, options);
  }

  unsubscribe(subscription?: Subscription) {
    subscription?.unsubscribe();

    // this is another fail safe to avoid memory leaks
    // observables that are directly subscribed in html via async pipe
    // can not call unsubscribed on that
    // doing this as a fail safe so we don't keep those subscriptions open
    // once they are being destroyed
    for (const key of Object.keys(this._listOfBs)) {
      const bs = this._listOfBs[key];
      if (!bs.observed) {
        this._listOfSubs[key]?.unsubscribe();
      }
    }
  }

  private connectMqttWebSocket({
    authToken,
    username,
    password,
    connected,
  }: {
    authToken: string;
    username: string;
    password: string;
    connected: boolean;
  }) {
    // connect to the mqtt service on login
    const options: IMqttServiceOptions = {
      ...environment.mqttConfig,
      username,
      password,
    } as IMqttServiceOptions;
    // add JWT token to URL as query string param
    options.url += `?${MQTT_CONNECT_AUTH_PARAM}=${authToken}`;
    if (connected) {
      this._mqttService.disconnect(true);
    }
    this._mqttService.connect(options);
    this.renewMqttConnection();
  }

  /**
   * Private Methods
   */
  private renewMqttConnection(): void {
    for (const key of Object.keys(this._listOfSubs)) {
      // fail safe
      // if there is a subscription, disconnect it
      // otherwise it'll create memory leaks
      this._listOfSubs[key]?.unsubscribe();

      // renew the subscriptions and publish the msgs
      this._listOfSubs[key] = this._mqttService.observe(key).subscribe((msg) => {
        this._listOfBs[key].next(msg);
      });
    }
  }

  private getRabbitMQUsernamePassword(): Observable<AppCredentialsApi> {
    if (environment.testsRunning) {
      return of({
        websocket: {
          username: 'test',
          password: 'test',
        },
        posthog: {
          token: 'test',
        },
        google_maps: {
          api_key: 'test',
        },
      });
    }

    return this._apiWrapper.handleObservableRequest<AppCredentialsApi>({
      useAPI: AvailableAPI.SWITCHDIN,
      url: `/api/v1/credentials/stormcloud-app/`,
      requestMethod: RequestMethod.GET,
      useHeader: UseHeaderType.AUTHORIZED_SWDIN,
      requestData: {},
    });
  }
}
