How to start and stop Mule flows from a client over HTTP

You may have a usecase where you want a flow in your Mule application to be started or stopped based on a condition or maybe on a trigger from an external system.

For each Mule flow, the initial state of the flow can be configured.
The default value is ‘– Empty –’, which means that the flow is started initially when the application is deployed.

The following screenshot shows the properties window of a flow shown in Anypoint Studio.
The initial state can be set here, be aware only for ‘normal’ flows, not subflows.
flow_properties

 

 

 

 

 

 

 

In this blogpost i will explain via a simple example application how you can control a flow (start/stop) and how to get the status of flow (started / stopped).
The complete application XML is available at the bottom of this post
In the application we have three flows:
#1: triggerFlow
#2: statusFlow
#3: processFlow

three_flows_overview

 

 

 

 

 

 

 

 

 

 

 

 

 

A system/client can access and trigger the processFlow via the triggerFlow, the action parameters that can be passed in are start and stop.

Via the statusFlow the current status of the processFlow is returned, started or stopped.

Of course you can also use this perfectly to trigger Mule flows inside of a Mule application, you can follow the same approach with the Groovy script provided.

Every flow with all it’s details is explained in the following part.

triggerFlow

triggerFlow

 

 

 

 

 

 

listening on a host on path /trigger and using a query parameter action to start or stop the processFlow.
example: http://localhost:8081/trigger?action=start

The query param is stored in a variable actionVar, that variable is used in the Groovy script which contains the logic to actually start/stop and check the status.
In the script the variable statusVar is created and used to return as a response to the client by setting it as the payload in the response scope.
If anything else than start or stop is passed in as parameter the application will return invalid and http status code 404.
Also when the flow is already started and a client tries to start it again it will return invalid, the statusFlow can be used to trigger the status before calling the triggerFlow.
In this application it’s just a choice of implementation, you can solve this in several ways depending on your needs.
In a real world scenario it would be nice to return a nice JSON response with some additional information, but it depends on what client can interpret.
The http.status property will be set as HTTP Status code in the response that clients can understand and work with.

Successful request
flow_started

 

Invalid request

flow_invalid

 

Groovy script

if (flowVars.actionVar == "start" && muleContext.registry.lookupFlowConstruct('processFlow').isStopped())
{
muleContext.registry.lookupFlowConstruct('processFlow').start();
flowVars.statusVar = "started";
}
else if (flowVars.actionVar == "stop" && muleContext.registry.lookupFlowConstruct('processFlow').isStarted())
{
muleContext.registry.lookupFlowConstruct('processFlow').stop();
flowVars.statusVar = "stopped";
}
else
{
flowVars.statusVar = "invalid";
message.setOutboundProperty('http.status', '404');
}

The most important catch here is how to check the current status of a flow in the Mule Registry and how to set the state.
We can use the method lookupFlowConstruct and specify the flowname as a string, on that flow that is returned we can use several methods.
In our example we use:
control:
start()
stop()
check:
isStarted()
isStopped()

Mule API docs:

http://www.mulesoft.org/docs/site/3.7.0/apidocs/org/mule/api/registry/MuleRegistry.html#lookupFlowConstruct(java.lang.String)
http://www.mulesoft.org/docs/site/3.7.0/apidocs/org/mule/construct/AbstractFlowConstruct.html

statusFlow
statusFlow

 

 

 

 

 

The status of the processFlow will be returned when this flow is called via /status
Again we use groovy to ‘lookup’ the status but this time it’s a short ternary expression that we use directly in MEL.
#[groovy:muleContext.registry.lookupFlowConstruct('processFlow').isStarted() ? 'started' : 'stopped']
This is quite an efficient way to use a small piece of Groovy code directly in a transformer by using the ‘groovy:’ prefix.
It checks if the processFlow is started, if so it returns ‘started’ otherwise it will return ‘stopped’.

processFlow
processFlow

 

 

 

 

The processFlow contains a simple Set Payload transformer in a Poll scope that sets the payload every 5 seconds to ‘The processFlow is now running!’.
In a normal scenario this will be the logic you want to start/stop and trigger by task schedule systems or external batch jobs for example.

Example console/logfiles
This is what it will look like in the logs/console
INFO 2016-05-17 15:11:24,658 [[flow_trigger_start-stop].HTTP_Listener_Configuration.worker.01] org.mule.construct.FlowConstructLifecycleManager: Starting flow: processFlow
INFO 2016-05-17 15:11:24,659 [[flow_trigger_start-stop].HTTP_Listener_Configuration.worker.01] org.mule.processor.SedaStageLifecycleManager: Starting service: processFlow.stage1
INFO 2016-05-17 15:11:24,672 [[flow_trigger_start-stop].HTTP_Listener_Configuration.worker.01] org.mule.transport.polling.MessageProcessorPollingConnector: Registering listener: processFlow on endpointUri: polling://-1912630717
INFO 2016-05-17 15:11:24,689 [[flow_trigger_start-stop].HTTP_Listener_Configuration.worker.01] org.mule.lifecycle.AbstractLifecycleManager: Initialising: 'null'. Object is: MessageProcessorPollingMessageReceiver
INFO 2016-05-17 15:11:24,703 [[flow_trigger_start-stop].HTTP_Listener_Configuration.worker.01] org.mule.lifecycle.AbstractLifecycleManager: Initialising Bean: polling://processFlow/2085552168
INFO 2016-05-17 15:11:24,704 [[flow_trigger_start-stop].HTTP_Listener_Configuration.worker.01] org.mule.lifecycle.AbstractLifecycleManager: Starting Bean: polling://processFlow/2085552168
INFO 2016-05-17 15:11:24,706 [[flow_trigger_start-stop].HTTP_Listener_Configuration.worker.01] org.mule.transport.polling.MessageProcessorPollingMessageReceiver: Connecting clusterizable message receiver
INFO 2016-05-17 15:11:24,707 [[flow_trigger_start-stop].HTTP_Listener_Configuration.worker.01] org.mule.lifecycle.AbstractLifecycleManager: Starting: 'null'. Object is: MessageProcessorPollingMessageReceiver
INFO 2016-05-17 15:11:24,708 [[flow_trigger_start-stop].HTTP_Listener_Configuration.worker.01] org.mule.transport.polling.MessageProcessorPollingMessageReceiver: Starting clusterizable message receiver
INFO 2016-05-17 15:11:29,719 [[flow_trigger_start-stop].processFlow.stage1.02] org.mule.api.processor.LoggerMessageProcessor: The processFlow is now running!
INFO 2016-05-17 15:11:34,711 [[flow_trigger_start-stop].processFlow.stage1.02] org.mule.api.processor.LoggerMessageProcessor: The processFlow is now running!
INFO 2016-05-17 15:11:39,712 [[flow_trigger_start-stop].processFlow.stage1.02] org.mule.api.processor.LoggerMessageProcessor: The processFlow is now running!
INFO 2016-05-17 15:11:44,709 [[flow_trigger_start-stop].processFlow.stage1.02] org.mule.api.processor.LoggerMessageProcessor: The processFlow is now running!
INFO 2016-05-17 15:11:49,712 [[flow_trigger_start-stop].processFlow.stage1.02] org.mule.api.processor.LoggerMessageProcessor: The processFlow is now running!
INFO 2016-05-17 15:11:54,709 [[flow_trigger_start-stop].processFlow.stage1.02] org.mule.api.processor.LoggerMessageProcessor: The processFlow is now running!
INFO 2016-05-17 15:11:59,708 [[flow_trigger_start-stop].processFlow.stage1.02] org.mule.api.processor.LoggerMessageProcessor: The processFlow is now running!
INFO 2016-05-17 15:12:00,023 [[flow_trigger_start-stop].HTTP_Listener_Configuration.worker.01] org.mule.construct.FlowConstructLifecycleManager: Stopping flow: processFlow
INFO 2016-05-17 15:12:00,024 [[flow_trigger_start-stop].HTTP_Listener_Configuration.worker.01] org.mule.transport.polling.MessageProcessorPollingConnector: Removing listener on endpointUri: polling://-1912630717
INFO 2016-05-17 15:12:00,027 [[flow_trigger_start-stop].HTTP_Listener_Configuration.worker.01] org.mule.lifecycle.AbstractLifecycleManager: Stopping: 'null'. Object is: MessageProcessorPollingMessageReceiver
INFO 2016-05-17 15:12:00,028 [[flow_trigger_start-stop].HTTP_Listener_Configuration.worker.01] org.mule.lifecycle.AbstractLifecycleManager: Stopping Bean: polling://processFlow/2085552168
INFO 2016-05-17 15:12:00,034 [[flow_trigger_start-stop].HTTP_Listener_Configuration.worker.01] org.mule.lifecycle.AbstractLifecycleManager: Disposing: 'null'. Object is: MessageProcessorPollingMessageReceiver
INFO 2016-05-17 15:12:00,035 [[flow_trigger_start-stop].HTTP_Listener_Configuration.worker.01] org.mule.lifecycle.AbstractLifecycleManager: Disposing Bean: polling://processFlow/2085552168
INFO 2016-05-17 15:12:00,038 [[flow_trigger_start-stop].HTTP_Listener_Configuration.worker.01] org.mule.processor.SedaStageLifecycleManager: Stopping service: processFlow.stage1

Complete code example

Leave a Reply

Your email address will not be published. Required fields are marked *