Source code for cngi.direct.framework

#  CASA Next Generation Infrastructure
#  Copyright (C) 2021 AUI, Inc. Washington DC, USA
#
#  This program is free software: you can redistribute it and/or modify
#  it under the terms of the GNU General Public License as published by
#  the Free Software Foundation, either version 3 of the License, or
#  (at your option) any later version.
#
#  This program is distributed in the hope that it will be useful,
#  but WITHOUT ANY WARRANTY; without even the implied warranty of
#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#  GNU General Public License for more details.
#
#  You should have received a copy of the GNU General Public License
#  along with this program.  If not, see <https://www.gnu.org/licenses/>.
"""
this module will be included in the api
"""
from dask.distributed import Client, LocalCluster

global_framework_client = None

########################
[docs]def InitializeFramework(workers=2, memory='8GB', processes=True, **kwargs): """ Initialize the CNGI framework This sets up the processing environment such that all future calls to Dask Dataframes, arrays, etc will automatically use the scheduler that is configured here. Parameters ---------- workers : int Number of processor cores to use, Default=2 memory : str Max memory allocated to each worker in string format. Default='8GB' processes : bool Whether to use processes (True) or threads (False), Default=True threads_per_worker : int Only used if processes = True. Number of threads per python worker process, Default=1 Returns ------- distributed.client.Client Client from Dask Distributed for use by Dask objects """ import socket if 'threads_per_worker' in kwargs.keys(): tpw = kwargs['threads_per_worker'] else: # enforce default of 1 thread per process tpw = 1 global global_framework_client if global_framework_client != None: global_framework_client.close() # set up a cluster object to pass into Client # for now, only supporting single machine cluster = LocalCluster(n_workers=workers, threads_per_worker=tpw, processes=processes, memory_limit=memory) global_framework_client = Client(cluster) print("Dask dashboard hosted at:") if processes == True: print(global_framework_client.cluster.dashboard_link) else: print(socket.gethostbyname(socket.gethostname()) + '/' + str(global_framework_client.scheduler_info()['services']['dashboard'])) return(global_framework_client)
#########################
[docs]def GetFrameworkClient(): """ Return the CNGI framework scheduler client Parameters ---------- Returns ------- distributed.client.Client Client from Dask Distributed for use by Dask objects """ return (global_framework_client)