添加spark serverless支持

This commit is contained in:
pengluan 2022-07-19 14:55:28 +08:00
parent 2a48da93e2
commit 9952166f6f

View File

@ -169,13 +169,16 @@ def launch_sparkjob(name, **kwargs):
while True: while True:
time.sleep(10) time.sleep(10)
sparkjob = k8s_client.get_one_crd(group=crd_info['group'], version=crd_info['version'],plural=crd_info['plural'], namespace=KFJ_NAMESPACE, name=name) sparkjob = k8s_client.get_one_crd(group=crd_info['group'], version=crd_info['version'],plural=crd_info['plural'], namespace=KFJ_NAMESPACE, name=name)
if sparkjob and (sparkjob['status'] == "Succeeded" or sparkjob['status'] == "Failed"):
if sparkjob:
status = json.loads(sparkjob['status_more']).get('applicationState', {}).get("state", '').upper()
if status=='COMPLETED' or 'FAILED' in status:
break break
sparkjob = k8s_client.get_one_crd(group=crd_info['group'],version=crd_info['version'],plural=crd_info['plural'],namespace=KFJ_NAMESPACE,name=name) sparkjob = k8s_client.get_one_crd(group=crd_info['group'],version=crd_info['version'],plural=crd_info['plural'],namespace=KFJ_NAMESPACE,name=name)
print("sparkjob %s finished, status %s"%(name, sparkjob['status'])) print("sparkjob %s finished, status %s"%(name, sparkjob['status_more']))
status = json.loads(sparkjob['status_more']).get('applicationState', {}).get("state", '').upper()
if sparkjob['status']!='Succeeded': if 'FAILED' in status:
exit(1) exit(1)
@ -200,7 +203,7 @@ if __name__ == "__main__":
sparkConf = [[x.split('=')[0], x.split('=')[1]] for x in args['sparkConf'].split('\n') if '=' in x] sparkConf = [[x.split('=')[0], x.split('=')[1]] for x in args['sparkConf'].split('\n') if '=' in x]
args['sparkConf'] = dict(zip([x[0] for x in sparkConf],[x[1] for x in sparkConf])) args['sparkConf'] = dict(zip([x[0] for x in sparkConf],[x[1] for x in sparkConf]))
args['sparkConf']['spark.driver.bindAddress']='0.0.0.0' # args['sparkConf']['spark.driver.bindAddress']='0.0.0.0' # k8s模式下不能用
hadoopConf = [[x.split('=')[0], x.split('=')[1]] for x in args['hadoopConf'].split('\n') if '=' in x] hadoopConf = [[x.split('=')[0], x.split('=')[1]] for x in args['hadoopConf'].split('\n') if '=' in x]
args['hadoopConf'] = dict(zip([x[0] for x in hadoopConf], [x[1] for x in hadoopConf])) args['hadoopConf'] = dict(zip([x[0] for x in hadoopConf], [x[1] for x in hadoopConf]))