import { Queue, QueueConsumer, SubscriptionOptions } from '../queue/queue.js';
import { runnableAsyncRunnable } from '../queue/runnable.js';
import { TopicDefinition } from '../queue/define-topic.js';

export function createTopicSubscribeAndConsume<
  Topic extends TopicDefinition<any, any>,
>(
  opts: {
    queue: Queue;
    topic: Topic;
    subscription: string;
    signal: AbortSignal;
  } & SubscriptionOptions<Topic['attributes']>,
  callback: QueueConsumer<Topic>,
) {
  return runnableAsyncRunnable(
    opts.queue.createTopic(opts.topic).then(async () => {
      const sub = await opts.queue.createSubscription(
        opts.topic,
        opts.subscription,
        { filter: opts.filter, autoRemove: opts.autoRemove },
      );
      return sub.consume(callback, {
        signal: opts.signal,
      });
    }),
  );
}
