import * as Core from "../core.js";
import { AnthropicError, APIUserAbortError } from "../error.js";
import {
  type BetaContentBlock,
  Messages as BetaMessages,
  type BetaMessage,
  type BetaRawMessageStreamEvent as BetaMessageStreamEvent,
  type BetaMessageParam,
  type MessageCreateParams as BetaMessageCreateParams,
  type MessageCreateParamsBase as BetaMessageCreateParamsBase,
  type BetaTextBlock,
  type BetaTextCitation,
} from "../resources/beta/messages/messages.js";
import { type ReadableStream, type Response } from "../_shims/index.js";
import { Stream } from "../streaming.js";
import { partialParse } from "../_vendor/partial-json-parser/parser.js";

export interface MessageStreamEvents {
  connect: () => void;
  streamEvent: (event: BetaMessageStreamEvent, snapshot: BetaMessage) => void;
  text: (textDelta: string, textSnapshot: string) => void;
  citation: (citation: BetaTextCitation, citationsSnapshot: BetaTextCitation[]) => void;
  inputJson: (partialJson: string, jsonSnapshot: unknown) => void;
  message: (message: BetaMessage) => void;
  contentBlock: (content: BetaContentBlock) => void;
  finalMessage: (message: BetaMessage) => void;
  error: (error: AnthropicError) => void;
  abort: (error: APIUserAbortError) => void;
  end: () => void;
}

type MessageStreamEventListeners<Event extends keyof MessageStreamEvents> = {
  listener: MessageStreamEvents[Event];
  once?: boolean;
}[];

const JSON_BUF_PROPERTY = '__json_buf';

export class BetaMessageStream implements AsyncIterable<BetaMessageStreamEvent> {
  messages: BetaMessageParam[] = [];
  receivedMessages: BetaMessage[] = [];
  #currentMessageSnapshot: BetaMessage | undefined;

  controller: AbortController = new AbortController();

  #connectedPromise: Promise<Response | null>;
  #resolveConnectedPromise: (response: Response | null) => void = () => {};
  #rejectConnectedPromise: (error: AnthropicError) => void = () => {};

  #endPromise: Promise<void>;
  #resolveEndPromise: () => void = () => {};
  #rejectEndPromise: (error: AnthropicError) => void = () => {};

  #listeners: { [Event in keyof MessageStreamEvents]?: MessageStreamEventListeners<Event> } = {};

  #ended = false;
  #errored = false;
  #aborted = false;
  #catchingPromiseCreated = false;
  #response: Response | null | undefined;
  #request_id: string | null | undefined;

  constructor() {
    this.#connectedPromise = new Promise<Response | null>((resolve, reject) => {
      this.#resolveConnectedPromise = resolve;
      this.#rejectConnectedPromise = reject;
    });

    this.#endPromise = new Promise<void>((resolve, reject) => {
      this.#resolveEndPromise = resolve;
      this.#rejectEndPromise = reject;
    });

    // Don't let these promises cause unhandled rejection errors.
    // we will manually cause an unhandled rejection error later
    // if the user hasn't registered any error listener or called
    // any promise-returning method.
    this.#connectedPromise.catch(() => {});
    this.#endPromise.catch(() => {});
  }

  get response(): Response | null | undefined {
    return this.#response;
  }

  get request_id(): string | null | undefined {
    return this.#request_id;
  }

  /**
   * Returns the `MessageStream` data, the raw `Response` instance and the ID of the request,
   * returned vie the `request-id` header which is useful for debugging requests and resporting
   * issues to Anthropic.
   *
   * This is the same as the `APIPromise.withResponse()` method.
   *
   * This method will raise an error if you created the stream using `MessageStream.fromReadableStream`
   * as no `Response` is available.
   */
  async withResponse(): Promise<{
    data: BetaMessageStream;
    response: Response;
    request_id: string | null | undefined;
  }> {
    const response = await this.#connectedPromise;
    if (!response) {
      throw new Error('Could not resolve a `Response` object');
    }

    return {
      data: this,
      response,
      request_id: response.headers.get('request-id'),
    };
  }

  /**
   * Intended for use on the frontend, consuming a stream produced with
   * `.toReadableStream()` on the backend.
   *
   * Note that messages sent to the model do not appear in `.on('message')`
   * in this context.
   */
  static fromReadableStream(stream: ReadableStream): BetaMessageStream {
    const runner = new BetaMessageStream();
    runner._run(() => runner._fromReadableStream(stream));
    return runner;
  }

  static createMessage(
    messages: BetaMessages,
    params: BetaMessageCreateParamsBase,
    options?: Core.RequestOptions,
  ): BetaMessageStream {
    const runner = new BetaMessageStream();
    for (const message of params.messages) {
      runner._addMessageParam(message);
    }
    runner._run(() =>
      runner._createMessage(
        messages,
        { ...params, stream: true },
        { ...options, headers: { ...options?.headers, 'X-Stainless-Helper-Method': 'stream' } },
      ),
    );
    return runner;
  }

  protected _run(executor: () => Promise<any>) {
    executor().then(() => {
      this._emitFinal();
      this._emit('end');
    }, this.#handleError);
  }

  protected _addMessageParam(message: BetaMessageParam) {
    this.messages.push(message);
  }

  protected _addMessage(message: BetaMessage, emit = true) {
    this.receivedMessages.push(message);
    if (emit) {
      this._emit('message', message);
    }
  }

  protected async _createMessage(
    messages: BetaMessages,
    params: BetaMessageCreateParams,
    options?: Core.RequestOptions,
  ): Promise<void> {
    const signal = options?.signal;
    if (signal) {
      if (signal.aborted) this.controller.abort();
      signal.addEventListener('abort', () => this.controller.abort());
    }
    this.#beginRequest();
    const { response, data: stream } = await messages
      .create({ ...params, stream: true }, { ...options, signal: this.controller.signal })
      .withResponse();
    this._connected(response);
    for await (const event of stream) {
      this.#addStreamEvent(event);
    }
    if (stream.controller.signal?.aborted) {
      throw new APIUserAbortError();
    }
    this.#endRequest();
  }

  protected _connected(response: Response | null) {
    if (this.ended) return;
    this.#response = response;
    this.#request_id = response?.headers.get('request-id');
    this.#resolveConnectedPromise(response);
    this._emit('connect');
  }

  get ended(): boolean {
    return this.#ended;
  }

  get errored(): boolean {
    return this.#errored;
  }

  get aborted(): boolean {
    return this.#aborted;
  }

  abort() {
    this.controller.abort();
  }

  /**
   * Adds the listener function to the end of the listeners array for the event.
   * No checks are made to see if the listener has already been added. Multiple calls passing
   * the same combination of event and listener will result in the listener being added, and
   * called, multiple times.
   * @returns this MessageStream, so that calls can be chained
   */
  on<Event extends keyof MessageStreamEvents>(event: Event, listener: MessageStreamEvents[Event]): this {
    const listeners: MessageStreamEventListeners<Event> =
      this.#listeners[event] || (this.#listeners[event] = []);
    listeners.push({ listener });
    return this;
  }

  /**
   * Removes the specified listener from the listener array for the event.
   * off() will remove, at most, one instance of a listener from the listener array. If any single
   * listener has been added multiple times to the listener array for the specified event, then
   * off() must be called multiple times to remove each instance.
   * @returns this MessageStream, so that calls can be chained
   */
  off<Event extends keyof MessageStreamEvents>(event: Event, listener: MessageStreamEvents[Event]): this {
    const listeners = this.#listeners[event];
    if (!listeners) return this;
    const index = listeners.findIndex((l) => l.listener === listener);
    if (index >= 0) listeners.splice(index, 1);
    return this;
  }

  /**
   * Adds a one-time listener function for the event. The next time the event is triggered,
   * this listener is removed and then invoked.
   * @returns this MessageStream, so that calls can be chained
   */
  once<Event extends keyof MessageStreamEvents>(event: Event, listener: MessageStreamEvents[Event]): this {
    const listeners: MessageStreamEventListeners<Event> =
      this.#listeners[event] || (this.#listeners[event] = []);
    listeners.push({ listener, once: true });
    return this;
  }

  /**
   * This is similar to `.once()`, but returns a Promise that resolves the next time
   * the event is triggered, instead of calling a listener callback.
   * @returns a Promise that resolves the next time given event is triggered,
   * or rejects if an error is emitted.  (If you request the 'error' event,
   * returns a promise that resolves with the error).
   *
   * Example:
   *
   *   const message = await stream.emitted('message') // rejects if the stream errors
   */
  emitted<Event extends keyof MessageStreamEvents>(
    event: Event,
  ): Promise<
    Parameters<MessageStreamEvents[Event]> extends [infer Param] ? Param
    : Parameters<MessageStreamEvents[Event]> extends [] ? void
    : Parameters<MessageStreamEvents[Event]>
  > {
    return new Promise((resolve, reject) => {
      this.#catchingPromiseCreated = true;
      if (event !== 'error') this.once('error', reject);
      this.once(event, resolve as any);
    });
  }

  async done(): Promise<void> {
    this.#catchingPromiseCreated = true;
    await this.#endPromise;
  }

  get currentMessage(): BetaMessage | undefined {
    return this.#currentMessageSnapshot;
  }

  #getFinalMessage(): BetaMessage {
    if (this.receivedMessages.length === 0) {
      throw new AnthropicError('stream ended without producing a Message with role=assistant');
    }
    return this.receivedMessages.at(-1)!;
  }

  /**
   * @returns a promise that resolves with the the final assistant Message response,
   * or rejects if an error occurred or the stream ended prematurely without producing a Message.
   */
  async finalMessage(): Promise<BetaMessage> {
    await this.done();
    return this.#getFinalMessage();
  }

  #getFinalText(): string {
    if (this.receivedMessages.length === 0) {
      throw new AnthropicError('stream ended without producing a Message with role=assistant');
    }
    const textBlocks = this.receivedMessages
      .at(-1)!
      .content.filter((block): block is BetaTextBlock => block.type === 'text')
      .map((block) => block.text);
    if (textBlocks.length === 0) {
      throw new AnthropicError('stream ended without producing a content block with type=text');
    }
    return textBlocks.join(' ');
  }

  /**
   * @returns a promise that resolves with the the final assistant Message's text response, concatenated
   * together if there are more than one text blocks.
   * Rejects if an error occurred or the stream ended prematurely without producing a Message.
   */
  async finalText(): Promise<string> {
    await this.done();
    return this.#getFinalText();
  }

  #handleError = (error: unknown) => {
    this.#errored = true;
    if (error instanceof Error && error.name === 'AbortError') {
      error = new APIUserAbortError();
    }
    if (error instanceof APIUserAbortError) {
      this.#aborted = true;
      return this._emit('abort', error);
    }
    if (error instanceof AnthropicError) {
      return this._emit('error', error);
    }
    if (error instanceof Error) {
      const anthropicError: AnthropicError = new AnthropicError(error.message);
      // @ts-ignore
      anthropicError.cause = error;
      return this._emit('error', anthropicError);
    }
    return this._emit('error', new AnthropicError(String(error)));
  };

  protected _emit<Event extends keyof MessageStreamEvents>(
    event: Event,
    ...args: Parameters<MessageStreamEvents[Event]>
  ) {
    // make sure we don't emit any MessageStreamEvents after end
    if (this.#ended) return;

    if (event === 'end') {
      this.#ended = true;
      this.#resolveEndPromise();
    }

    const listeners: MessageStreamEventListeners<Event> | undefined = this.#listeners[event];
    if (listeners) {
      this.#listeners[event] = listeners.filter((l) => !l.once) as any;
      listeners.forEach(({ listener }: any) => listener(...args));
    }

    if (event === 'abort') {
      const error = args[0] as APIUserAbortError;
      if (!this.#catchingPromiseCreated && !listeners?.length) {
        Promise.reject(error);
      }
      this.#rejectConnectedPromise(error);
      this.#rejectEndPromise(error);
      this._emit('end');
      return;
    }

    if (event === 'error') {
      // NOTE: _emit('error', error) should only be called from #handleError().

      const error = args[0] as AnthropicError;
      if (!this.#catchingPromiseCreated && !listeners?.length) {
        // Trigger an unhandled rejection if the user hasn't registered any error handlers.
        // If you are seeing stack traces here, make sure to handle errors via either:
        // - runner.on('error', () => ...)
        // - await runner.done()
        // - await runner.final...()
        // - etc.
        Promise.reject(error);
      }
      this.#rejectConnectedPromise(error);
      this.#rejectEndPromise(error);
      this._emit('end');
    }
  }

  protected _emitFinal() {
    const finalMessage = this.receivedMessages.at(-1);
    if (finalMessage) {
      this._emit('finalMessage', this.#getFinalMessage());
    }
  }

  #beginRequest() {
    if (this.ended) return;
    this.#currentMessageSnapshot = undefined;
  }
  #addStreamEvent(event: BetaMessageStreamEvent) {
    if (this.ended) return;
    const messageSnapshot = this.#accumulateMessage(event);
    this._emit('streamEvent', event, messageSnapshot);

    switch (event.type) {
      case 'content_block_delta': {
        const content = messageSnapshot.content.at(-1)!;
        switch (event.delta.type) {
          case 'text_delta': {
            if (content.type === 'text') {
              this._emit('text', event.delta.text, content.text || '');
            }
            break;
          }
          case 'citations_delta': {
            if (content.type === 'text') {
              this._emit('citation', event.delta.citation, content.citations ?? []);
            }
            break;
          }
          case 'input_json_delta': {
            if (content.type === 'tool_use' && content.input) {
              this._emit('inputJson', event.delta.partial_json, content.input);
            }
            break;
          }
          default:
            checkNever(event.delta);
        }
        break;
      }
      case 'message_stop': {
        this._addMessageParam(messageSnapshot);
        this._addMessage(messageSnapshot, true);
        break;
      }
      case 'content_block_stop': {
        this._emit('contentBlock', messageSnapshot.content.at(-1)!);
        break;
      }
      case 'message_start': {
        this.#currentMessageSnapshot = messageSnapshot;
        break;
      }
      case 'content_block_start':
      case 'message_delta':
        break;
    }
  }
  #endRequest(): BetaMessage {
    if (this.ended) {
      throw new AnthropicError(`stream has ended, this shouldn't happen`);
    }
    const snapshot = this.#currentMessageSnapshot;
    if (!snapshot) {
      throw new AnthropicError(`request ended without sending any chunks`);
    }
    this.#currentMessageSnapshot = undefined;
    return snapshot;
  }

  protected async _fromReadableStream(
    readableStream: ReadableStream,
    options?: Core.RequestOptions,
  ): Promise<void> {
    const signal = options?.signal;
    if (signal) {
      if (signal.aborted) this.controller.abort();
      signal.addEventListener('abort', () => this.controller.abort());
    }
    this.#beginRequest();
    this._connected(null);
    const stream = Stream.fromReadableStream<BetaMessageStreamEvent>(readableStream, this.controller);
    for await (const event of stream) {
      this.#addStreamEvent(event);
    }
    if (stream.controller.signal?.aborted) {
      throw new APIUserAbortError();
    }
    this.#endRequest();
  }

  /**
   * Mutates this.#currentMessage with the current event. Handling the accumulation of multiple messages
   * will be needed to be handled by the caller, this method will throw if you try to accumulate for multiple
   * messages.
   */
  #accumulateMessage(event: BetaMessageStreamEvent): BetaMessage {
    let snapshot = this.#currentMessageSnapshot;

    if (event.type === 'message_start') {
      if (snapshot) {
        throw new AnthropicError(`Unexpected event order, got ${event.type} before receiving "message_stop"`);
      }
      return event.message;
    }

    if (!snapshot) {
      throw new AnthropicError(`Unexpected event order, got ${event.type} before "message_start"`);
    }

    switch (event.type) {
      case 'message_stop':
        return snapshot;
      case 'message_delta':
        snapshot.stop_reason = event.delta.stop_reason;
        snapshot.stop_sequence = event.delta.stop_sequence;
        snapshot.usage.output_tokens = event.usage.output_tokens;
        return snapshot;
      case 'content_block_start':
        snapshot.content.push(event.content_block);
        return snapshot;
      case 'content_block_delta': {
        const snapshotContent = snapshot.content.at(event.index);

        switch (event.delta.type) {
          case 'text_delta': {
            if (snapshotContent?.type === 'text') {
              snapshotContent.text += event.delta.text;
            }
            break;
          }
          case 'citations_delta': {
            if (snapshotContent?.type === 'text') {
              snapshotContent.citations ??= [];
              snapshotContent.citations.push(event.delta.citation);
            }
            break;
          }
          case 'input_json_delta': {
            if (snapshotContent?.type === 'tool_use') {
              // we need to keep track of the raw JSON string as well so that we can
              // re-parse it for each delta, for now we just store it as an untyped
              // non-enumerable property on the snapshot
              let jsonBuf = (snapshotContent as any)[JSON_BUF_PROPERTY] || '';
              jsonBuf += event.delta.partial_json;

              Object.defineProperty(snapshotContent, JSON_BUF_PROPERTY, {
                value: jsonBuf,
                enumerable: false,
                writable: true,
              });

              if (jsonBuf) {
                snapshotContent.input = partialParse(jsonBuf);
              }
            }
            break;
          }
          default:
            checkNever(event.delta);
        }
        return snapshot;
      }
      case 'content_block_stop':
        return snapshot;
    }
  }

  [Symbol.asyncIterator](): AsyncIterator<BetaMessageStreamEvent> {
    const pushQueue: BetaMessageStreamEvent[] = [];
    const readQueue: {
      resolve: (chunk: BetaMessageStreamEvent | undefined) => void;
      reject: (error: unknown) => void;
    }[] = [];
    let done = false;

    this.on('streamEvent', (event) => {
      const reader = readQueue.shift();
      if (reader) {
        reader.resolve(event);
      } else {
        pushQueue.push(event);
      }
    });

    this.on('end', () => {
      done = true;
      for (const reader of readQueue) {
        reader.resolve(undefined);
      }
      readQueue.length = 0;
    });

    this.on('abort', (err) => {
      done = true;
      for (const reader of readQueue) {
        reader.reject(err);
      }
      readQueue.length = 0;
    });

    this.on('error', (err) => {
      done = true;
      for (const reader of readQueue) {
        reader.reject(err);
      }
      readQueue.length = 0;
    });

    return {
      next: async (): Promise<IteratorResult<BetaMessageStreamEvent>> => {
        if (!pushQueue.length) {
          if (done) {
            return { value: undefined, done: true };
          }
          return new Promise<BetaMessageStreamEvent | undefined>((resolve, reject) =>
            readQueue.push({ resolve, reject }),
          ).then((chunk) => (chunk ? { value: chunk, done: false } : { value: undefined, done: true }));
        }
        const chunk = pushQueue.shift()!;
        return { value: chunk, done: false };
      },
      return: async () => {
        this.abort();
        return { value: undefined, done: true };
      },
    };
  }

  toReadableStream(): ReadableStream {
    const stream = new Stream(this[Symbol.asyncIterator].bind(this), this.controller);
    return stream.toReadableStream();
  }
}

// used to ensure exhaustive case matching without throwing a runtime error
function checkNever(x: never) {}
