Build a geocoding applicaton with Python
Introduction
When it comes to building business process applications, coordinating all parts of the applicaton from user interaction to API calls can become very complex and unreliable.
Temporal helps shield you from these issues by providing relability and operability.
For more detail on how Temporal can help business process applicatons, please see Temporal's Chief Product Officer's discussion in The Top 3 Use Cases for Temporal (ft. Temporal PMs).
In this tutorial, you'll build a business process application asks the user for an API key and a location, and the program will geocode it using GeoApify's api to get a latitude and longitude for the user's address.
Prerequisites
Before starting this tutorial:
- Complete the Hello World tutorial
- Install requests (tested with version 2.32.3)
- Get a Geoapify API key
pip install requests
As mentioned in the Hello World tutorial, please be sure your
temporal server is running, using temporal server start-dev
.
If you're having trouble, sometimes closing that terminal and
restarting it can help.
Now that you have your environment ready, it's time to build an invincible geocoder.
Develop a Workflow to orchestrate your interactions with the user and API
The workflow is the side-effect-free function that orchestrates the activities (which are atomic, have potential for side effects, and are perhaps non-deterministic).
In this case, our actions are (1) getting information from the user and (2) querying the API.
Our workflow needs to orchestrate those by executing them in order.
Let's do it. First, create a new file called workflow.py
and add
the following code:
from datetime import timedelta
from temporalio import workflow
# Import activity, passing it through the sandbox without reloading the module
with workflow.unsafe.imports_passed_through():
from activities import get_address_from_user, get_api_key_from_user, get_lat_long, QueryParams
_TIMEOUT_5_MINS=5*60
@workflow.defn
class GeoCode:
@workflow.run
async def run(self) -> list:
api_key_from_user = await workflow.execute_activity(
get_api_key_from_user, start_to_close_timeout=timedelta(seconds=_TIMEOUT_5_MINS)
)
address_from_user = await workflow.execute_activity(
get_address_from_user, start_to_close_timeout=timedelta(seconds=_TIMEOUT_5_MINS)
)
query_params = QueryParams(api_key=api_key_from_user, address=address_from_user)
lat_long = await workflow.execute_activity(
get_lat_long, query_params, start_to_close_timeout=timedelta(seconds=_TIMEOUT_5_MINS)
)
return lat_long
Again, this is where the orchestration is happening. As you can see, there is one method in the workflow definition (more on that below), and there are multiple calls to activities within that workflow. This method is coordinating all the atomic activities that are non-deterministic and prone to fail. These activities are explained in the next section.
The GeoCode
class is decorated with the @workflow.defn
which
must be set on any registered Workflow class.
The async def run()
function is decorated with the @workflow.run
which is set on the (exactly)
one asynchronous method on the same class as the @workflow.defn
. As alluded above,
there can be more than one method here, but exactly one of them needs
to be decorated with @workflow.run
.
Inside the calls to workflow.execute_activity()
, pass the activity.
If that activity takes arguments, then pass those in after the activity:
workflow.execute_activity(activity, args*, ...)
. It is generally
recommended to collapse the arguments into a single one using a dataclass,
which we have here as QueryParams
(it will be defined in the next section).
With the skeleton in place, we can now develop the activities.
Develop Activities to interact with the user and the API
In this section, you'll implement the activities that interact with the outside world. The workflow is doing the orchestration, and the activities are doing the atomic actions.
We'll start small and add the following to a new file called activities.py
:
from temporalio import activity
@activity.defn
async def get_api_key_from_user() -> str:
return input("Please give your API key: ")
@activity.defn
async def get_address_from_user() -> str:
return input("Please give an address: ")
These are pretty simple. They're just two functions that get input from the user -- namely the api key and the address -- and return the values to the caller. The caller is the Workflow you made in the previous section, so these activities are returning their results to the Workflow.
The @activity.defn
decorator is what tells the Temporal client
(explained in a few sections) that this function is a Temporal
Activity.
These activities are the first two that are
called in the Workflow from the previous section. After the workflow
calls these two, it has the user's api key and address. Next,
it calls an activity called get_lat_long
, with an argument of
type QueryParams
. Now is the time to implement those. Add the following
to the activities.py
file that you just made:
import requests
from dataclasses import dataclass
@dataclass
class QueryParams:
api_key: str
address: str
@activity.defn
async def get_lat_long(query_params: QueryParams) -> list:
base_url = "https://api.geoapify.com/v1/geocode/search"
params = {
"text": query_params.address,
"apiKey": query_params.api_key
}
response = requests.get(base_url, params=params, timeout=1000)
response_json = response.json()
lat_long = response_json['features'][0]['geometry']['coordinates']
return lat_long
As mentioned before, the argument to workflows and activities are
recommended to be dataclasses. In this case, the activity is
an API call that needs the user's location and api key, so we'll
bundle those as a data class. That's QueryParams
.
Next, we need to define the Activity. As before, we need to
decorate the function with @activity.defn
. The function accepts
a QueryParams
object, calls the Geoapify api, and returns the coordinates
from the result. The Workflow receives this resulting lattitude and longitude
and assigns the variable response_from_geoapify
to them.
The next two steps put these plans into action. You'll make and run a worker, and then you'll run the workflow you made in section 1.
Create a Worker to host your Workflow and Activities
The last conceptual piece you need is a worker. The worker
is the process that connects to the Temporal Service, and listens
on a certain task queue for any work to do. Here is how you can make
a worker factory. Make a new file called make_worker.py
and
enter the following
from temporalio.client import Client
from temporalio.worker import Worker
from activities import get_address_from_user, get_api_key_from_user, get_lat_long
from workflow import GeoCode
def make_worker(client: Client):
worker = Worker(
client,
task_queue="geocode-task-queue",
workflows=[GeoCode],
activities=[get_address_from_user, get_api_key_from_user, get_lat_long]
)
return worker
The argument, client
, is the connection to the Temporal Service.
The task_queue
is the task queue that the Worker listens on (later,
when we run the workflow, we'll put items on that task queue). It also
accepts a workflows
argument. This is the list of Workflows it can
process. Lastly, it accepts a list of Activities.
Now that you have a way to make a Worker, it's time to connect to the
Temporal Service, use that function to make a Worker, and run the Worker.
You'll do this in a new file called run_worker.py
.
import asyncio
from temporalio.client import Client
from make_worker import make_worker
async def main():
client = await Client.connect("localhost:7233", namespace="default")
worker = make_worker(client)
await worker.run()
if __name__ == "__main__":
asyncio.run(main())
This code connects to the Temporal Service using Client.connect
.
You'll run this code soon, and when you do, the Temporal Service
will need to be running for this line to work (which is why it was mentioned in the
prerequisites).
Next, the code passes that client
into the make_worker
function
we made just above. This gives us a Worker, which we use to
call the .run()
method. This is what makes the Worker run
and start listening for work on the task queue.
At this point, you can open a new terminal (keep the service running
in a different terminal). Navigate to the project directory, and run
python run_worker.py
(it won't output anything yet).
It will start listening, but it has nothing
to do yet because there is nothing on the queue. And that's exactly
what the next section is for. In it, we will run our Workflow,
which will put tasks on the queue for the Worker.
Run the Workflow to execute the business process
This is the last step. The process will soon run.
The last thing you need to do is execute the workflow. To do this,
you need to connect to the Temporal Service (again, with Client.connect
),
and then call the .execute_workflow()
method, as shown below. Enter
the following code in a new file called run_workflow.py
.
import asyncio
from workflow import GeoCode
from temporalio.client import Client
async def main():
# Create a client connected to the server at the given address
client = await Client.connect("localhost:7233")
# Execute a workflow
lat_long = await client.execute_workflow(
GeoCode.run, id="geocode-workflow", task_queue="geocode-task-queue"
)
print(f"Lat long: {lat_long}")
if __name__ == "__main__":
asyncio.run(main())
In this piece, we simply connect to the service, then call execute_workflow
.
As you can see, it has a return value that it puts in lat_long
. This
return value comes from the return value of the Workflow definition.
The arguments are the following:
- The Workflow method that was decorated with
@workflow.defn
. - The id for the Workflow. This is shown in the WebUI to help you identify which Workflow is which.
- The task queue. This is the queue to which the workflow (and its activities) are added.
After that, the code simply prints the return value.
You're ready to run the code. Open a third terminal (the other
two processes should still be running), navigate to the project
directory, and run python run_workflow.py
. At this point, the
application is officially running. If you look at the terminal
that is running the Worker (not the terminal that is running the Workflow),
it should be asking you for your
api key. You should enter the Geoapify api key mentioned in
the prerequisites. Next, it will ask you for an address. Try
whatever you want. It will then query the Geoapify API to
geocode the address, and it will return the lattitude and longitude
of the address to the terminal running the Workflow.
Conclusion
You have built a business process application that runs invincibly with Temporal.
Next Steps
Now on your own, try defining a retry policy and applying it to the activities.