import _ from "lodash";
import Packetizer from "../../packetizer";
import { TunnelStats } from "../../packetizer/stats";
import { isLcmMsg, isPbMsg, WireMsg, WireType, PublishOptions } from "../../types";
import { Connector } from "../base";
import { ChannelDefinition, RemoteDeviceType } from "../types";
import LivekitTransportLayer, { PROTOCOL_CHANNELS } from "./livekit_transport_layer";
import { logger } from "../../logger";

const MAX_CHUNK_SIZE = 64000; // bytes

const PACKETIZER_INTERVAL = 10; // dt for the packetizer's event loop
const PACKETIZER_STATS_INTERVAL = 5000; // time for updating packetizer stats

const getCurrentTime = () => Date.now();

export interface LivekitRoundTripStats {
  averageRoundTripTime: number; // average round trip time for the session
  lastRecordedCurrentRoundTripTime: number; // most recently recorded round trip time
  averageLastTens: number; // average of the last 10 most recently recorded round trip times
}

export default class LivekitConnector extends Connector {
  private packetizer!: Packetizer;

  public tunnelStats: TunnelStats;
  public roundTripStats?: LivekitRoundTripStats;
  public lastReceivedTunnelMessage: number;
  private pingRepliesReceived: number;
  private totalRoundTripTime: number;
  private lastTenRTT: number[]; // store the last 10 recorded round trip times

  private packetizerIntervalId?: number | NodeJS.Timeout;
  private packetizerStatsIntervalId?: number | NodeJS.Timeout;

  constructor(
    readonly transport: LivekitTransportLayer,
    public readonly deviceType: RemoteDeviceType
  ) {
    super();

    this.packetizer = new Packetizer(
      (channel, data, options) => {
        if (this.sid) {
          this.transport.publishTunnelMessage(channel, this.sid, data, options);
        }
      },
      this.handleMessage,
      MAX_CHUNK_SIZE
    );

    this.reset();
  }

  private get sid() {
    return this.transport.getDeviceSid(this.deviceType);
  }

  public reset = () => {
    this.pingRepliesReceived = 0;
    this.totalRoundTripTime = 0;
    this.lastTenRTT = [];

    this.tunnelStats = this.packetizer.getStats();
  };

  public startPacketizer = () => {
    this.packetizerIntervalId = setInterval(() => this.packetizer.tick(), PACKETIZER_INTERVAL);
    this.packetizerStatsIntervalId = setInterval(() => {
      this.tunnelStats = this.packetizer.getStats();
    }, PACKETIZER_STATS_INTERVAL);
  };

  public stopPacketizer = () => {
    if (this.packetizerIntervalId) {
      clearInterval(this.packetizerIntervalId as number);
    }
    if (this.packetizerStatsIntervalId) {
      clearInterval(this.packetizerStatsIntervalId as number);
    }
  };

  public handleTunnelData = (data: ArrayBuffer) => {
    this.lastReceivedTunnelMessage = performance.now();
    this.packetizer.handleChunkData(data);
  };

  public publish = async <D extends WireMsg>(
    channel: string | ChannelDefinition<WireType<D>>,
    msg: D,
    options?: PublishOptions
  ) => {
    const channelName = _.isString(channel) ? channel : channel.channel;

    if (this.sid) {
      if (PROTOCOL_CHANNELS.has(channelName) && isPbMsg(msg)) {
        const serialized = msg.serializeBinary();
        this.transport.publishProtocolMessage(this.sid, serialized, channelName, options);
        this.transport.addMessageToLivekitSkybusStatsReport(
          "sendStats",
          channelName,
          serialized.byteLength,
          this.sid
        );
        return;
      }

      let data: Uint8Array;
      if (isPbMsg(msg)) {
        data = new Uint8Array(msg.serializeBinary());
      } else if (isLcmMsg(msg)) {
        data = new Uint8Array(msg.encode());
      } else {
        throw new Error("Can't publish a non-protobuf, non-lcm message.");
      }

      if (this.transport.isConnected) {
        this.packetizer.transmitEvent(channelName, data, options);
        this.transport.addMessageToLivekitSkybusStatsReport(
          "sendStats",
          channelName,
          data.byteLength,
          this.sid
        );
      } else {
        logger.info("Livekit Connector not connected, publish failed.");
      }
    } else {
      // TODO(sam): log something useful for no sid
    }
  };

  public recordRoundTripStatsFromPing = (pingTime: number) => {
    if (!this.roundTripStats) {
      this.roundTripStats = {
        lastRecordedCurrentRoundTripTime: 0,
        averageRoundTripTime: 0,
        averageLastTens: 0,
      };
    }

    this.pingRepliesReceived += 1;
    const lastRecordedCurrentRoundTripTime = (getCurrentTime() - pingTime) / 1000;
    this.roundTripStats.lastRecordedCurrentRoundTripTime = lastRecordedCurrentRoundTripTime;
    this.totalRoundTripTime += lastRecordedCurrentRoundTripTime;
    if (this.pingRepliesReceived > 0) {
      this.roundTripStats.averageRoundTripTime = this.totalRoundTripTime / this.pingRepliesReceived;
    }
    this.lastTenRTT.push(this.roundTripStats.lastRecordedCurrentRoundTripTime);
    if (this.lastTenRTT && this.lastTenRTT.length > 10) {
      this.lastTenRTT.shift();
    }
    this.roundTripStats.averageLastTens =
      this.lastTenRTT?.reduce((acc, num) => acc + num, 0) / this.lastTenRTT.length;
  };
}
