import { TopicDefinition } from '../../queue/define-topic.js';
import { QueueMessage, Subscription } from '../../queue/queue.js';
import { CustomError } from '../../custom-error.js';

export async function* createAsyncSub<T extends TopicDefinition<any, any>>(
  sub: Subscription<T>,
  signal: AbortSignal,
): AsyncGenerator<QueueMessage<T['data'], T['attributes']>> {
  let nextResolve:
    | ((val: QueueMessage<T['data'], T['attributes']>) => void)
    | null = null;
  let nextReject: (err: Error) => void;
  const buffer: QueueMessage<T['data'], T['attributes']>[] = []; // TODO improve if possible, potential memory increase

  const runnable = sub.consume(
    async (msg) => {
      if (nextResolve) {
        nextResolve(msg);
        nextResolve = null;
      } else {
        buffer.push(msg);
      }
    },
    { signal },
  );

  await runnable.ready;

  if (signal.aborted) {
    return;
  }

  signal.addEventListener(
    'abort',
    () => {
      if (nextReject) {
        nextReject(new CustomError('abort', null, {}));
      }
    },
    true,
  );

  do {
    const bufferItem = buffer.pop();
    if (bufferItem) {
      yield bufferItem;
    } else {
      const nextItemPromise = new Promise<
        QueueMessage<T['data'], T['attributes']>
      >((resolve, reject) => {
        nextResolve = resolve;
        nextReject = reject;
      });

      try {
        yield await nextItemPromise;
      } catch (e) {
        if (e instanceof CustomError && e.message === 'abort') {
          return;
        }
        throw e;
      }
    }
  } while (!signal.aborted);
}
