I'm using Flask 1.0.2 with Python 3.6 on Ubuntu 18.04. My app should use asyncio and asyncio.create_subprocess_exec() to lauch a background script, read stdout from it, and then return status when the script is done.
I am basically trying to implement an answer from this post: Non-blocking read on a subprocess.PIPE in python
The script is successfully launched, and I get all of my expected output from it, but the problem is that it never returns ( meaning the Killing subprocess now line is never reached). When I check the process list (ps) from the Linux terminal, the background script has exited.
What am I doing wrong and how can I successfully break out of the async for line in process.stdout loop?
At the top of my file after my imports I create my event loop:
# Create a loop to run all the tasks in.
global eventLoop ; asyncio.set_event_loop(None)
eventLoop = asyncio.new_event_loop()
asyncio.get_child_watcher().attach_loop(eventLoop)
I define my async coroutine above my route:
async def readAsyncFunctionAndKill(cmd):
# Use global event loop
global eventLoop
print("[%s] Starting async Training Script ..." % (os.path.basename(__file__)))
process = await asyncio.create_subprocess_exec(cmd,stdout=PIPE,loop=eventLoop)
print("[%s] Starting to read stdout ..." % (os.path.basename(__file__)))
async for line in process.stdout:
line = line.decode(locale.getpreferredencoding(False))
print("%s"%line, flush=True)
print("[%s] Killing subprocess now ..." % (os.path.basename(__file__)))
process.kill()
print("[%s] Training process return code was: %s" % (os.path.basename(__file__), process.returncode))
return await process.wait() # wait for the child process to exit
And my (abbreviated) route is here:
@app.route("/train_model", methods=["GET"])
def train_new_model():
# Use global event loop
global eventLoop
with closing(eventLoop):
eventLoop.run_until_complete(readAsyncFunctionAndKill("s.py"))
return jsonify("done"), 200
The "s.py" script called is marked as executable and is in the same working directory. The abbreviated script is shown here ( it contains several subprocesses and instantiates PyTorch classes ):
def main():
# Ensure that swap is activated since we don't have enough RAM to train our model otherwise
print("[%s] Activating swap now ..." % (os.path.basename(__file__)))
subprocess.call("swapon -a", shell=True)
# Need to initialize GPU
print("[%s] Initializing GPU ..." % (os.path.basename(__file__)))
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
defaults.device = torch.device("cuda")
with torch.cuda.device(0):
torch.tensor([1.]).cuda()
print("[%s] Cuda is Available: %s - with Name: %s ..." % (os.path.basename(__file__),torch.cuda.is_available(),torch.cuda.get_device_name(0)))
try:
print("[%s] Beginning to train new model and replace existing model ..." % (os.path.basename(__file__)))
# Batch size
bs = 16
#bs = 8
# Create ImageBunch
tfms = get_transforms(do_flip=True,
flip_vert=True,
max_rotate=180.,
max_zoom=1.1,
max_lighting=0.5,
max_warp=0.1,
p_affine=0.75,
p_lighting=0.75)
# Create databunch using folder names as class names
# This also applies the transforms and batch size to the data
os.chdir(TRAINING_DIR)
data = ImageDataBunch.from_folder("TrainingData", ds_tfms=tfms, train='.', valid_pct=0.2, bs=bs)
...
# Create a new learner with an early stop callback
learn = cnn_learner(data, models.resnet18, metrics=[accuracy], callback_fns=[
partial(EarlyStoppingCallback, monitor='accuracy', min_delta=0.01, patience=3)])
...
print("[%s] All done training ..." % (os.path.basename(__file__)))
# Success
sys.exit(0)
except Exception as err:
print("[%s] Error training model [ %s ] ..." % (os.path.basename(__file__),err))
sys.exit(255)
if __name__== "__main__":
main()
s.pyscript actually ends or exits without error if you run it yourself from the command line?run_until_complete()? You never re-open the loop. You really don't need to manage the loop to this extent, however.s.pymade executable? What is the current working directory when you try to run this, does it match the directory wheres.pyis located?print("[%s] Starting async Training Script ..." % (__name__,))instead of extracting the base name each time? Or better, use theloggingmodule andlogging.getLogger(__name__)to attach the current module name to the logged records?