import { McapIndexedReader, McapTypes } from "@mcap/core";

import { loadDecompressHandlers } from "./decompressHandlers";
import { McapParsingError } from "./exceptions";
import { type Message } from "./Message";
import { MessageChunks } from "./MessageChunks";
import { type Readable } from "./ReadableFile";
import { makeDeserializer, type Deserializer } from "./serde";

interface InitializationOptions {
  cache?: boolean;
}

interface BaseOptions {
  topics: string[];
  abortSignal: AbortSignal;
}

interface LatestMessageAsOfTimeOptions extends BaseOptions {
  bufferDurationNs?: bigint;
}

interface LoadMessagesOptions extends BaseOptions {
  startTime: bigint;
  endTime: bigint;
}

export class McapReader<MessageData = Record<string, unknown>> {
  #channelInfoById: Map<
    number,
    {
      // Channel definition
      channel: McapTypes.Channel;
      // Channel schema
      schema: McapTypes.Schema;
      // Deserializer for messages within this channel
      deserializer: Deserializer;
    }
  > = new Map();
  #file: Readable;
  #chunks: MessageChunks<MessageData>;
  #options: InitializationOptions;
  #reader: McapIndexedReader;

  static async forFile(
    file: Readable,
    options?: InitializationOptions & { abortSignal?: AbortSignal },
  ): Promise<McapReader> {
    file.setAbortSignal(options?.abortSignal);
    const decompressHandlers = await loadDecompressHandlers();
    const reader = await McapIndexedReader.Initialize({
      readable: file,
      decompressHandlers,
    });
    return new McapReader(file, reader, {
      cache: options?.cache ?? true,
    });
  }

  constructor(
    file: Readable,
    reader: McapIndexedReader,
    options?: InitializationOptions,
  ) {
    this.#chunks = new MessageChunks(reader.chunkIndexes);
    this.#file = file;
    this.#options = {
      cache: options?.cache ?? true,
    };
    this.#reader = reader;

    for (const [index, channel] of reader.channelsById) {
      const schema = reader.schemasById.get(channel.schemaId);
      if (schema === undefined) {
        throw new McapParsingError(
          `Schema not found for channel ${channel.topic} (id: ${channel.schemaId})`,
        );
      }
      this.#channelInfoById.set(index, {
        channel,
        schema,
        deserializer: makeDeserializer(channel, schema),
      });
    }
  }

  public get startTime(): bigint | undefined {
    return this.#reader.statistics?.messageStartTime;
  }

  public get endTime(): bigint | undefined {
    return this.#reader.statistics?.messageEndTime;
  }

  public get chunkIndices(): readonly McapTypes.ChunkIndex[] {
    return this.#reader.chunkIndexes;
  }

  public getLatestMessageAsOfTime(
    time: bigint,
  ): Message<MessageData> | undefined {
    const chunk = this.#chunks.getChunkAsOfTime(time);
    if (chunk === undefined || !chunk.isLoaded()) {
      return;
    }
    return chunk.getMessageAsOfTime(time);
  }

  public async loadMessageAsOfTime(
    time: bigint,
    options?: Partial<LatestMessageAsOfTimeOptions>,
  ): Promise<Message<MessageData> | undefined> {
    const chunk = this.#chunks.getChunkAsOfTime(time);
    if (chunk === undefined) {
      return;
    }
    if (chunk.requiresLoad()) {
      await this.loadMessages({
        startTime: chunk.start,
        endTime: chunk.end + (options?.bufferDurationNs ?? 0n),
        topics: options?.topics,
        abortSignal: options?.abortSignal,
      });
    }
    if (chunk.isLoading()) {
      const loaded = await chunk.waitUntilLoaded(options?.abortSignal);
      if (!loaded) {
        return;
      }
    }
    return chunk.getMessageAsOfTime(time);
  }

  public async loadMessages(
    options?: Partial<LoadMessagesOptions>,
  ): Promise<Message<MessageData>[]> {
    const topicStartTime = this.startTime;
    const topicEndTime = this.endTime;

    const startTime = options?.startTime ?? topicStartTime;
    const endTime = options?.endTime ?? topicEndTime;

    const chunks = this.#chunks.getChunksInRange({
      start: startTime,
      end: endTime,
    });

    this.#file.setAbortSignal(options?.abortSignal);

    // Load chunks in batches, yielding to the event loop between each batch
    // to allow other tasks to run (like those that would set an abort signal)
    const unloadedChunks = chunks.filter((chunk) => chunk.requiresLoad());
    while (unloadedChunks.length > 0) {
      if (options?.abortSignal?.aborted) {
        break;
      }
      const batch = unloadedChunks.splice(0, 3);
      await Promise.allSettled(
        batch.map(async (chunk) => {
          if (options?.abortSignal?.aborted) {
            return;
          }
          chunk.beginLoading();
          try {
            const messageStream = this.#reader.readMessages({
              topics: options?.topics,
              startTime: chunk.start,
              endTime: chunk.end,
            });

            const messages = [];
            for await (const message of messageStream) {
              const channelInfo = this.#channelInfoById.get(message.channelId);
              if (channelInfo === undefined) {
                // This should only happen in the case of an improperly written MCAP file
                chunk.loadingDidError();
                return;
              }
              const data = channelInfo.deserializer(
                message.data,
              ) as MessageData;
              messages.push({
                data,
                logTime: message.logTime,
                publishTime: message.publishTime,
                sequence: message.sequence,
              });
            }

            chunk.messagesLoaded(messages);
          } catch (err) {
            if (err instanceof DOMException && err.name === "AbortError") {
              chunk.unload();
              return;
            }
            chunk.loadingDidError();
          }
        }),
      );
    }

    // Wait for chunks that are currently loading, however unlikely.
    const loadingChunks = chunks.filter((chunk) => chunk.isLoading());
    if (loadingChunks.length > 0) {
      await Promise.all(
        loadingChunks.map((chunk) =>
          chunk.waitUntilLoaded(options?.abortSignal),
        ),
      );
    }

    if (options?.abortSignal?.aborted) {
      throw new DOMException("Aborted", "AbortError");
    }

    const messages = chunks.reduce((messages, chunk) => {
      return messages.concat(
        chunk.getMessagesInRange({ start: startTime, end: endTime }),
      );
    }, [] as Message<MessageData>[]);

    if (!this.#options.cache) {
      // Unload messages from chunk if we're not caching messages
      chunks.forEach((chunk) => chunk.unload());
    }

    return messages;
  }
}
