Working with the new Task Service Worker

Hey Folks,

As you might know, v9 introduced a new way of working wiht Automated Tasks. That is a wee-bit more complicated to use then we are used to with the old Jobs System.
However, it does have the advantage, that it has a proper Queuing Mechanism with a Background Service to process the Queue.

I am trying to make use of that. But i hit a brick wall, as soon as i try to make my Command work asyncroniously.
I’v got my Task and Commands set up and they are running smoothly when started directly over the CLI and run synroniously. - Like most core tasks do.

When i add the “AsyncCommandInterface” to the command i can still run the command, and i can tell from the MessengerMessages Table in my DB, that my Messages are beeing queued.
Then when i start the Service Workder by doing messenger:consume async i get a Exception:

[Symfony\Component\Messenger\Exception\MessageDecodingFailedException]
Could not decode stamp: Cannot create an instance of “Concrete\Core\Command\Task\Output\ConsoleOutput” from serialized data beca
use its constructor requires parameter “symfonyOutput” to be present.

I’ve looked at the ReindexPageCommand as an Example a hundert times and tried to tweak various things in my Commands and Handlers with no progress at all.

Does anyone here have a Idea on what i could be doing wrong? - The Page Reindexing Task is running fine, and it’s doing things in a very similar fashion.

I have also noticed a few shortcomings of the Service Worker in general and opened a Issue here:

Can you post the actual code of your task controller, along with the code for any commands that the task controller creates?

Posting the whole Job would have been a bit much. It does a lot. - But i striped it down to a minimal example that still yields the same error when the messenger service worker comes across it:

This is the Task Controller:

class DummyController extends AbstractController
{
    public function getName(): string
    {
        return t('Dummy Task');
    }

    public function getDescription(): string
    {
        return t('Does nothing.');
    }

    public function getTaskRunner(TaskInterface $task, InputInterface $input): TaskRunnerInterface
    {
        $this->app = Application::getFacadeApplication();
        $batch = Batch::create('Dummy Batch');
        $count = 100;
        for ($i = 1; $i < $count; $i++) {
            $batch->add(new DummyCommand($i));
        }

        return new BatchProcessTaskRunner($task, $batch, $input, t('Processing Dummy Task started. ' . $count . ' elements to process...'));
    }
}

The Command:

class DummyCommand extends Command implements AsyncCommandInterface
{
    public $row;

    public function __construct($row)
    {
        $this->row = $row;
    }

    public static function getHandler(): string
    {
        return DummyCommandHandler::class;
    }
}

And the CommandHandler:

class DummyCommandHandler implements OutputAwareInterface
{
    use OutputAwareTrait;

    public function __invoke(DummyCommand $command)
    {
        $this->app = Application::getFacadeApplication();
        try {
            $msg = 'Dummy command executed for row '.$command->row;
            Log::addInfo($msg);
            return true;
        } catch (\Exception $e) {
            Log::addWarning('Error occured while processing dummy command: '.$e->getMessage());
            return false;
        }
    }
}