import {
  ConsumeOptions,
  PublishMessageOptions,
  Queue,
  QueueConsumer,
  QueueMessage,
  Subscription,
  SubscriptionOptions,
  TopicInstance,
} from './queue.js';
import { getOrCreate } from '../utils/get-or-create.js';
import { Runnable } from './runnable.js';
import { TopicDefinition } from './define-topic.js';

export interface MemoryQueue extends Queue {
  whenStable(): Promise<void>;
}

export type MemoryHandler<Topic extends TopicDefinition<any, any>> = (
  message: QueueMessage<Topic['data'], Topic['attributes']>,
) => Promise<void>;

interface MemoryTopic {
  subscriptions: Record<
    string,
    {
      messages: { data: unknown; attributes: Record<string, string> }[];
      handler: {
        consumer: MemoryHandler<any>;
        chain: Promise<void>;
        subscriptionOptions: SubscriptionOptions<any>;
      } | null;
    }
  >;
}

export function createMemoryQueue(): MemoryQueue {
  const topics: Record<string, MemoryTopic> = {};
  return {
    async whenStable(): Promise<void> {
      await Promise.allSettled(
        Object.values(topics)
          .map((t) => Object.values(t.subscriptions))
          .flat()
          .map((s) => s.handler?.chain)
          .filter((c) => !!c),
      );
    },
    async createTopic<Topic extends TopicDefinition<any, any>>(
      topic: Topic,
    ): Promise<TopicInstance<Topic>> {
      const existing = getOrCreate<MemoryTopic>(topics, topic.name, () => ({
        subscriptions: {},
      }));
      return {
        async publish(
          message: Topic['data']['_output'],
          options: PublishMessageOptions<Topic['attributes']>,
        ): Promise<void> {
          for (const subscription of Object.values(existing.subscriptions)) {
            const handler = subscription.handler;
            if (handler) {
              handler.chain = handler.chain.then(() =>
                handler
                  .consumer({
                    data: message,
                    attributes: options.attributes ?? {},
                  })
                  .catch((err) => {
                    console.error(err);
                  }),
              );
            } else {
              subscription.messages.push({
                data: message,
                attributes: options.attributes ?? {},
              });
            }
          }
        },
      };
    },
    async createSubscription<Topic extends TopicDefinition<any, any>>(
      topic: Topic,
      subscription: string,
      subscriptionOptions: SubscriptionOptions<Topic['attributes']>,
    ): Promise<Subscription<Topic>> {
      const existing = topics[topic.name];
      if (!existing) {
        throw new Error('unknown topic');
      }

      const sub = getOrCreate(existing.subscriptions, subscription, () => ({
        messages: [],
        handler: null,
      }));

      return {
        consume(
          callback: QueueConsumer<Topic>,
          opts: ConsumeOptions,
        ): Runnable<void> {
          if (sub.handler) {
            throw new Error('already consuming');
          }
          const handler = {
            consumer: (
              message: QueueMessage<Topic['data'], Topic['attributes']>,
            ) => callback(message, { signal: opts.signal }),
            chain: Promise.resolve(),
            subscriptionOptions,
          };
          sub.handler = handler;
          const promise = new Promise<void>((resolve) => {
            opts.signal.addEventListener(
              'abort',
              () => {
                sub.handler = null;
                handler.chain.then(() => {
                  resolve();
                });

                if (subscriptionOptions.autoRemove) {
                  delete existing.subscriptions[subscription];
                }
              },
              { once: true },
            );
          });
          for (const message of sub.messages) {
            handler.chain = handler.chain.then(() =>
              handler.consumer(message).catch((err) => {
                console.error(err);
              }),
            );
          }
          sub.messages = [];

          return {
            ready: Promise.resolve({
              value: undefined,
              done: promise,
            }),
          };
        },
        async remove(): Promise<void> {
          if (existing.subscriptions[subscription]) {
            delete existing.subscriptions[subscription];
          }
        },
      };
    },
  };
}
