Project

General

Profile

« Previous | Next » 

Revision 3be2b3aa

Added by Vincent MEMBRÉ about 7 years ago

Fixes #10114: Allow relay-api to make asynchronous remote run call with output

View differences:

rudder-server-relay/SOURCES/relay-api/relay_api/remote_run.py
import requests
from subprocess import Popen, PIPE, STDOUT
from flask import Flask, Response
import itertools
# disable ssl warning on rudder connection in all the possible ways
try:
......
except:
pass
from pprint import pprint
NEXTHOP = None
REMOTE_RUN_COMMAND = "sudo /opt/rudder/bin/rudder remote run"
......
if keep_output:
process = Popen(command, shell=True, stdout=PIPE, stderr=STDOUT)
output = "".join([prefix + ":" + line for line in process.stdout.readlines()])
retval = process.wait()
if not asynchronous:
output = "".join([prefix + ":" + line for line in process.stdout.readlines()])
process.wait()
else:
def stream():
for line in iter(process.stdout.readline,''):
yield line.rstrip()+"\n"
output=stream()
else:
output = ""
process = Popen(command, shell=True)
if not asynchronous:
process.wait()
return output
def make_api_call(host, nodes, all_nodes, classes, keep_output, asynchronous):
......
if nodes:
data["nodes"] = ",".join(nodes)
req = requests.post(url, data=data, verify=False)
req = requests.post(url, data=data, verify=False, stream = asynchronous)
if req.status_code == 200:
return req.text
response = ""
if asynchronous:
def stream():
for content in req.iter_content():
yield content
response = stream()
else:
return req.text
else:
raise ValueError("Upstream Error: " + req.text)
......
NEXTHOP = get_next_hop(local_nodes, my_uuid)
def generate_output():
result = []
# Pass the call to sub relays
for relay in get_next_relays(NEXTHOP):
host = resolve_hostname(local_nodes, relay)
if all_nodes:
yield make_api_call(host, None, all_nodes, classes, keep_output, asynchronous)
result.append(make_api_call(host, None, all_nodes, classes, keep_output, asynchronous))
else:
relay_nodes = get_relay_nodes(NEXTHOP, relay, nodes)
if relay_nodes:
yield make_api_call(host, get_relay_nodes(NEXTHOP, relay, nodes), all_nodes, classes, keep_output, asynchronous)
result.append(make_api_call(host, get_relay_nodes(NEXTHOP, relay, nodes), all_nodes, classes, keep_output, asynchronous))
# Call directly managed nodes when needed
if all_nodes:
local_nodes_to_call = get_all_my_nodes(NEXTHOP)
......
local_nodes_to_call = get_my_nodes(NEXTHOP, nodes)
for node in local_nodes_to_call:
host = resolve_hostname(local_nodes, node)
yield call_remote_run(host, node, classes, keep_output, asynchronous)
result.append( call_remote_run(host, node, classes, keep_output, asynchronous))
return result
# depending on wether we want an asynch result we need to do something different on the output
if asynchronous:
# An async response, we produce generators that will be the result of our various async calls
# We jsut need to chain them
# May be could mix them ? Don't know how ( pipe ? outputstream ? )
response = itertools.chain(* generate_output())
else:
# A synch response, we already have the final output and
response = "\n".join(generate_output())
return Response(generate_output())
return Response(response )

Also available in: Unified diff