import { Queue, QueueMessage } from '../../queue/queue.js';
import { stateUpdateTopic } from '../../management/aggregation-attach-topic.js';
import { generateId } from '../../management/generate-id.js';
import { createAsyncSub } from './create-async-sub.js';

export async function* createRunnable(opts: {
  storageName: string;
  state: string;
  queue: Queue;
  signal: AbortSignal;
}): AsyncGenerator<
  QueueMessage<
    (typeof stateUpdateTopic)['data'],
    (typeof stateUpdateTopic)['attributes']
  >
> {
  await opts.queue.createTopic(stateUpdateTopic);

  const sub = await opts.queue.createSubscription(
    stateUpdateTopic,
    `${opts.storageName}:${generateId()}`,
    {
      autoRemove: true,
      filter: {
        storageName: opts.storageName,
        state: opts.state,
      },
    },
  );

  yield* createAsyncSub(sub, opts.signal);
}
