From 9952166f6f052ff3638a6713138f18feacc4f7bf Mon Sep 17 00:00:00 2001 From: pengluan Date: Tue, 19 Jul 2022 14:55:28 +0800 Subject: [PATCH] =?UTF-8?q?=E6=B7=BB=E5=8A=A0spark=20serverless=E6=94=AF?= =?UTF-8?q?=E6=8C=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- job-template/job/spark/launcher.py | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/job-template/job/spark/launcher.py b/job-template/job/spark/launcher.py index a92fec01..07e01bd8 100644 --- a/job-template/job/spark/launcher.py +++ b/job-template/job/spark/launcher.py @@ -169,13 +169,16 @@ def launch_sparkjob(name, **kwargs): while True: time.sleep(10) sparkjob = k8s_client.get_one_crd(group=crd_info['group'], version=crd_info['version'],plural=crd_info['plural'], namespace=KFJ_NAMESPACE, name=name) - if sparkjob and (sparkjob['status'] == "Succeeded" or sparkjob['status'] == "Failed"): - break + + if sparkjob: + status = json.loads(sparkjob['status_more']).get('applicationState', {}).get("state", '').upper() + if status=='COMPLETED' or 'FAILED' in status: + break sparkjob = k8s_client.get_one_crd(group=crd_info['group'],version=crd_info['version'],plural=crd_info['plural'],namespace=KFJ_NAMESPACE,name=name) - print("sparkjob %s finished, status %s"%(name, sparkjob['status'])) - - if sparkjob['status']!='Succeeded': + print("sparkjob %s finished, status %s"%(name, sparkjob['status_more'])) + status = json.loads(sparkjob['status_more']).get('applicationState', {}).get("state", '').upper() + if 'FAILED' in status: exit(1) @@ -200,7 +203,7 @@ if __name__ == "__main__": sparkConf = [[x.split('=')[0], x.split('=')[1]] for x in args['sparkConf'].split('\n') if '=' in x] args['sparkConf'] = dict(zip([x[0] for x in sparkConf],[x[1] for x in sparkConf])) - args['sparkConf']['spark.driver.bindAddress']='0.0.0.0' + # args['sparkConf']['spark.driver.bindAddress']='0.0.0.0' # k8s模式下不能用 hadoopConf = [[x.split('=')[0], x.split('=')[1]] for x in args['hadoopConf'].split('\n') if '=' in x] args['hadoopConf'] = dict(zip([x[0] for x in hadoopConf], [x[1] for x in hadoopConf]))