pyspark.SparkContext.addJobTag#

SparkContext.addJobTag(tag)[source]#

Add a tag to be assigned to all the jobs started by this thread.

Often, a unit of execution in an application consists of multiple Spark actions or jobs. Application programmers can use this method to group all those jobs together and give a group tag. The application can use SparkContext.cancelJobsWithTag() to cancel all running executions with this tag.

There may be multiple tags present at the same time, so different parts of application may use different tags to perform cancellation at different levels of granularity.

New in version 3.5.0.

Parameters
tagstr

The tag to be added. Cannot contain ‘,’ (comma) character.

Examples

>>> import threading
>>> from time import sleep
>>> from pyspark import InheritableThread
>>> sc.setInterruptOnCancel(interruptOnCancel=True)
>>> result = "Not Set"
>>> lock = threading.Lock()
>>> def map_func(x):
...     sleep(100)
...     raise RuntimeError("Task should have been cancelled")
...
>>> def start_job(x):
...     global result
...     try:
...         sc.addJobTag("job_to_cancel")
...         result = sc.parallelize(range(x)).map(map_func).collect()
...     except Exception as e:
...         result = "Cancelled"
...     lock.release()
...
>>> def stop_job():
...     sleep(5)
...     sc.cancelJobsWithTag("job_to_cancel")
...
>>> suppress = lock.acquire()
>>> suppress = InheritableThread(target=start_job, args=(10,)).start()
>>> suppress = InheritableThread(target=stop_job).start()
>>> suppress = lock.acquire()
>>> print(result)
Cancelled
>>> sc.clearJobTags()