import { WEBRTC_SUBSCRIPTION_ACK_PB } from "@skydio/channels/src/webrtc_subscription_ack_pb";
import { WEBRTC_SUBSCRIPTION_LIST_PB } from "@skydio/channels/src/webrtc_subscription_list_pb";
import { SERVER_INFO_PB } from "@skydio/channels/src/server_info_pb";
import type { tunnel_channel_t } from "@skydio/lcm/types/skybus_tunnel/tunnel_channel_t";
import type { QosClass } from "@skydio/pbtypes/pbtypes/gen/skybus_tunnel/qos_class_pb";
import type { SubscriptionAck } from "@skydio/pbtypes/pbtypes/infrastructure/skybus_tunnel/tunnel_pb";
import {
  SubscribedChannel,
  SubscribedChannelList,
} from "@skydio/pbtypes/pbtypes/infrastructure/skybus_tunnel/tunnel_pb";

import Packetizer from "@skydio/skybus/src/packetizer";
import { logger } from "./logger";
import type { PublishOptions } from "@skydio/skybus/src/types";

type CloseTunnel = () => void;
type ChunkSender = (channel: string, data: Uint8Array, options?: PublishOptions) => void;
type MessageListener = (channel: string, payload: Uint8Array) => void;

const MAX_CHUNK_SIZE = 64000; // bytes
const SUBSCRIPTION_LIST_SEND_INTERVAL = 1_000; // 1Hz
const PACKETIZER_TICK_INTERVAL = 10; // 100Hz (TODO(trey): is this a reasonable value?)

export class SkybusTunnel {
  private _subscribedChannels: Array<tunnel_channel_t>;
  private _activeChannels: Set<string>;
  private _nonce: number;
  private _src: string;
  private _dst: string;
  private _pilot: string;
  private _packetizer: Packetizer;
  private _chunkSender: ChunkSender | null;
  private _messageListener: MessageListener | null;

  constructor(src: string, dst: string) {
    // source and destination identities for the tunnel peers
    this._src = src;
    this._dst = dst;
    this._pilot = "";

    // subscription list management
    this._subscribedChannels = [];
    this._activeChannels = new Set();
    this._nonce = 0;

    // packetizer for handling chunking and reassembly
    this._packetizer = new Packetizer(
      this._onSendChunk.bind(this),
      this._onRecvMessage.bind(this),
      MAX_CHUNK_SIZE
    );
    this._chunkSender = null;
    this._messageListener = null;
  }

  getSrc() {
    return this._src;
  }

  getDst() {
    return this._dst;
  }

  subscribe(...channels: Array<tunnel_channel_t>) {
    this._subscribedChannels.push(...channels);
  }

  unsubscribe(...channels: Array<tunnel_channel_t | string>) {
    const unsubscribeFrom = channels.reduce((acc, channel) => {
      if (typeof channel === "string") {
        acc.add(channel);
      } else {
        acc.add(channel.channel);
      }
      return acc;
    }, new Set<string>());
    this._subscribedChannels = this._subscribedChannels.filter(
      ({ channel }) => !unsubscribeFrom.has(channel)
    );
  }

  send(channel: string, payload: Uint8Array, options?: PublishOptions) {
    this._packetizer.transmitEvent(channel, payload, options);
  }

  handleChunk(chunk: Uint8Array) {
    this._packetizer.handleChunkData(chunk);
  }

  start(chunkSender: ChunkSender, messageListener: MessageListener): CloseTunnel {
    this._chunkSender = chunkSender;
    this._messageListener = messageListener;

    // start an interval to resend the subscription list as needed; scheduling on an interval helps
    // ensure we don't oversend if some frontend logic thrashes adding/removing subscriptions
    const subListInterval = setInterval(() => {
      // check if we need to resent the subscription list
      let needsSend = true;
      if (this._activeChannels.size !== this._subscribedChannels.length) {
        needsSend = true;
      } else {
        needsSend = this._subscribedChannels.every(({ channel }) => {
          return this._activeChannels.has(channel);
        });
      }

      // send the subscription list with a new nonce, if necessary
      if (needsSend) {
        this._activeChannels.clear();
        this._sendSubscriptionList();
      }
    }, SUBSCRIPTION_LIST_SEND_INTERVAL);

    // start an interval to run tick the packetizer
    const packetizerTickInterval = setInterval(() => {
      this._packetizer.tick();
    }, PACKETIZER_TICK_INTERVAL);

    // return a cleanup function to clear intervals and remove message listeners
    return () => {
      this._chunkSender = null;
      this._messageListener = null;
      clearInterval(subListInterval);
      clearInterval(packetizerTickInterval);
    };
  }

  private _sendSubscriptionList() {
    const subList = new SubscribedChannelList();
    this._subscribedChannels.forEach(channel => {
      const sub = new SubscribedChannel();
      sub.setChannel(channel.channel);
      sub.setDownsampleNominalDt(channel.downsample_nominal_dt);
      sub.setDownsampleLowThrottleDt(channel.downsample_low_throttle_dt);
      sub.setQos(channel.qos.value as unknown as QosClass.Enum);
      subList.addChannels(sub);
    });
    subList.setClientId(this._src);
    subList.setNonce(this._nextNonce());
    this._packetizer.transmitEvent(WEBRTC_SUBSCRIPTION_LIST_PB.channel, subList.serializeBinary());
  }

  private _onSendChunk(channel: string, payload: Uint8Array, options?: PublishOptions) {
    this._chunkSender?.(channel, payload, options);
  }

  private _onRecvMessage(channel: string, payload: Uint8Array) {
    this._messageListener?.(channel, payload);
    if (channel === WEBRTC_SUBSCRIPTION_ACK_PB.channel) {
      const ack = WEBRTC_SUBSCRIPTION_ACK_PB.type(payload);
      this._onSubscriptionListAck(ack);
    } else if (channel === SERVER_INFO_PB.channel) {
      const info = SERVER_INFO_PB.type(payload);
      this._pilot = info.getPilotId();
    }
  }

  private _onSubscriptionListAck(ack: SubscriptionAck) {
    if (this._nonce === ack.getNonce()) {
      this._activeChannels.clear();
      for (const { channel } of this._subscribedChannels) {
        this._activeChannels.add(channel);
      }
    } else {
      logger.warn("received subscription list ack with unexpected nonce", { nonce: this._nonce });
    }
  }

  private _nextNonce() {
    return ++this._nonce;
  }
}
