Add demo for vertical federated learning (#9103)
This commit is contained in:
63
demo/nvflare/horizontal/README.md
Normal file
63
demo/nvflare/horizontal/README.md
Normal file
@@ -0,0 +1,63 @@
|
||||
# Experimental Support of Horizontal Federated XGBoost using NVFlare
|
||||
|
||||
This directory contains a demo of Horizontal Federated Learning using
|
||||
[NVFlare](https://nvidia.github.io/NVFlare/).
|
||||
|
||||
## Training with CPU only
|
||||
|
||||
To run the demo, first build XGBoost with the federated learning plugin enabled (see the
|
||||
[README](../../plugin/federated/README.md)).
|
||||
|
||||
Install NVFlare (note that currently NVFlare only supports Python 3.8):
|
||||
```shell
|
||||
pip install nvflare
|
||||
```
|
||||
|
||||
Prepare the data:
|
||||
```shell
|
||||
./prepare_data.sh
|
||||
```
|
||||
|
||||
Start the NVFlare federated server:
|
||||
```shell
|
||||
/tmp/nvflare/poc/server/startup/start.sh
|
||||
```
|
||||
|
||||
In another terminal, start the first worker:
|
||||
```shell
|
||||
/tmp/nvflare/poc/site-1/startup/start.sh
|
||||
```
|
||||
|
||||
And the second worker:
|
||||
```shell
|
||||
/tmp/nvflare/poc/site-2/startup/start.sh
|
||||
```
|
||||
|
||||
Then start the admin CLI:
|
||||
```shell
|
||||
/tmp/nvflare/poc/admin/startup/fl_admin.sh
|
||||
```
|
||||
|
||||
In the admin CLI, run the following command:
|
||||
```shell
|
||||
submit_job horizontal-xgboost
|
||||
```
|
||||
|
||||
Once the training finishes, the model file should be written into
|
||||
`/tmp/nvlfare/poc/site-1/run_1/test.model.json` and `/tmp/nvflare/poc/site-2/run_1/test.model.json`
|
||||
respectively.
|
||||
|
||||
Finally, shutdown everything from the admin CLI, using `admin` as password:
|
||||
```shell
|
||||
shutdown client
|
||||
shutdown server
|
||||
```
|
||||
|
||||
## Training with GPUs
|
||||
|
||||
To demo with Federated Learning using GPUs, make sure your machine has at least 2 GPUs.
|
||||
Build XGBoost with the federated learning plugin enabled along with CUDA, but with NCCL
|
||||
turned off (see the [README](../../plugin/federated/README.md)).
|
||||
|
||||
Modify `config/config_fed_client.json` and set `use_gpus` to `true`, then repeat the steps
|
||||
above.
|
||||
68
demo/nvflare/horizontal/custom/controller.py
Normal file
68
demo/nvflare/horizontal/custom/controller.py
Normal file
@@ -0,0 +1,68 @@
|
||||
"""
|
||||
Example of training controller with NVFlare
|
||||
===========================================
|
||||
"""
|
||||
import multiprocessing
|
||||
|
||||
from nvflare.apis.client import Client
|
||||
from nvflare.apis.fl_context import FLContext
|
||||
from nvflare.apis.impl.controller import Controller, Task
|
||||
from nvflare.apis.shareable import Shareable
|
||||
from nvflare.apis.signal import Signal
|
||||
from trainer import SupportedTasks
|
||||
|
||||
import xgboost.federated
|
||||
|
||||
|
||||
class XGBoostController(Controller):
|
||||
def __init__(self, port: int, world_size: int, server_key_path: str,
|
||||
server_cert_path: str, client_cert_path: str):
|
||||
"""Controller for federated XGBoost.
|
||||
|
||||
Args:
|
||||
port: the port for the gRPC server to listen on.
|
||||
world_size: the number of sites.
|
||||
server_key_path: the path to the server key file.
|
||||
server_cert_path: the path to the server certificate file.
|
||||
client_cert_path: the path to the client certificate file.
|
||||
"""
|
||||
super().__init__()
|
||||
self._port = port
|
||||
self._world_size = world_size
|
||||
self._server_key_path = server_key_path
|
||||
self._server_cert_path = server_cert_path
|
||||
self._client_cert_path = client_cert_path
|
||||
self._server = None
|
||||
|
||||
def start_controller(self, fl_ctx: FLContext):
|
||||
self._server = multiprocessing.Process(
|
||||
target=xgboost.federated.run_federated_server,
|
||||
args=(self._port, self._world_size, self._server_key_path,
|
||||
self._server_cert_path, self._client_cert_path))
|
||||
self._server.start()
|
||||
|
||||
def stop_controller(self, fl_ctx: FLContext):
|
||||
if self._server:
|
||||
self._server.terminate()
|
||||
|
||||
def process_result_of_unknown_task(self, client: Client, task_name: str,
|
||||
client_task_id: str, result: Shareable,
|
||||
fl_ctx: FLContext):
|
||||
self.log_warning(fl_ctx, f"Unknown task: {task_name} from client {client.name}.")
|
||||
|
||||
def control_flow(self, abort_signal: Signal, fl_ctx: FLContext):
|
||||
self.log_info(fl_ctx, "XGBoost training control flow started.")
|
||||
if abort_signal.triggered:
|
||||
return
|
||||
task = Task(name=SupportedTasks.TRAIN, data=Shareable())
|
||||
self.broadcast_and_wait(
|
||||
task=task,
|
||||
min_responses=self._world_size,
|
||||
fl_ctx=fl_ctx,
|
||||
wait_time_after_min_received=1,
|
||||
abort_signal=abort_signal,
|
||||
)
|
||||
if abort_signal.triggered:
|
||||
return
|
||||
|
||||
self.log_info(fl_ctx, "XGBoost training control flow finished.")
|
||||
90
demo/nvflare/horizontal/custom/trainer.py
Normal file
90
demo/nvflare/horizontal/custom/trainer.py
Normal file
@@ -0,0 +1,90 @@
|
||||
import os
|
||||
|
||||
from nvflare.apis.executor import Executor
|
||||
from nvflare.apis.fl_constant import FLContextKey, ReturnCode
|
||||
from nvflare.apis.fl_context import FLContext
|
||||
from nvflare.apis.shareable import Shareable, make_reply
|
||||
from nvflare.apis.signal import Signal
|
||||
|
||||
import xgboost as xgb
|
||||
from xgboost import callback
|
||||
|
||||
|
||||
class SupportedTasks(object):
|
||||
TRAIN = "train"
|
||||
|
||||
|
||||
class XGBoostTrainer(Executor):
|
||||
def __init__(self, server_address: str, world_size: int, server_cert_path: str,
|
||||
client_key_path: str, client_cert_path: str, use_gpus: bool):
|
||||
"""Trainer for federated XGBoost.
|
||||
|
||||
Args:
|
||||
server_address: address for the gRPC server to connect to.
|
||||
world_size: the number of sites.
|
||||
server_cert_path: the path to the server certificate file.
|
||||
client_key_path: the path to the client key file.
|
||||
client_cert_path: the path to the client certificate file.
|
||||
"""
|
||||
super().__init__()
|
||||
self._server_address = server_address
|
||||
self._world_size = world_size
|
||||
self._server_cert_path = server_cert_path
|
||||
self._client_key_path = client_key_path
|
||||
self._client_cert_path = client_cert_path
|
||||
self._use_gpus = use_gpus
|
||||
|
||||
def execute(self, task_name: str, shareable: Shareable, fl_ctx: FLContext,
|
||||
abort_signal: Signal) -> Shareable:
|
||||
self.log_info(fl_ctx, f"Executing {task_name}")
|
||||
try:
|
||||
if task_name == SupportedTasks.TRAIN:
|
||||
self._do_training(fl_ctx)
|
||||
return make_reply(ReturnCode.OK)
|
||||
else:
|
||||
self.log_error(fl_ctx, f"{task_name} is not a supported task.")
|
||||
return make_reply(ReturnCode.TASK_UNKNOWN)
|
||||
except BaseException as e:
|
||||
self.log_exception(fl_ctx,
|
||||
f"Task {task_name} failed. Exception: {e.__str__()}")
|
||||
return make_reply(ReturnCode.EXECUTION_EXCEPTION)
|
||||
|
||||
def _do_training(self, fl_ctx: FLContext):
|
||||
client_name = fl_ctx.get_prop(FLContextKey.CLIENT_NAME)
|
||||
rank = int(client_name.split('-')[1]) - 1
|
||||
communicator_env = {
|
||||
'xgboost_communicator': 'federated',
|
||||
'federated_server_address': self._server_address,
|
||||
'federated_world_size': self._world_size,
|
||||
'federated_rank': rank,
|
||||
'federated_server_cert': self._server_cert_path,
|
||||
'federated_client_key': self._client_key_path,
|
||||
'federated_client_cert': self._client_cert_path
|
||||
}
|
||||
with xgb.collective.CommunicatorContext(**communicator_env):
|
||||
# Load file, file will not be sharded in federated mode.
|
||||
dtrain = xgb.DMatrix('agaricus.txt.train')
|
||||
dtest = xgb.DMatrix('agaricus.txt.test')
|
||||
|
||||
# Specify parameters via map, definition are same as c++ version
|
||||
param = {'max_depth': 2, 'eta': 1, 'objective': 'binary:logistic'}
|
||||
if self._use_gpus:
|
||||
self.log_info(fl_ctx, f'Training with GPU {rank}')
|
||||
param['tree_method'] = 'gpu_hist'
|
||||
param['gpu_id'] = rank
|
||||
|
||||
# Specify validations set to watch performance
|
||||
watchlist = [(dtest, 'eval'), (dtrain, 'train')]
|
||||
num_round = 20
|
||||
|
||||
# Run training, all the features in training API is available.
|
||||
bst = xgb.train(param, dtrain, num_round, evals=watchlist,
|
||||
early_stopping_rounds=2, verbose_eval=False,
|
||||
callbacks=[callback.EvaluationMonitor(rank=rank)])
|
||||
|
||||
# Save the model.
|
||||
workspace = fl_ctx.get_prop(FLContextKey.WORKSPACE_OBJECT)
|
||||
run_number = fl_ctx.get_prop(FLContextKey.CURRENT_RUN)
|
||||
run_dir = workspace.get_run_dir(run_number)
|
||||
bst.save_model(os.path.join(run_dir, "test.model.json"))
|
||||
xgb.collective.communicator_print("Finished training\n")
|
||||
25
demo/nvflare/horizontal/prepare_data.sh
Executable file
25
demo/nvflare/horizontal/prepare_data.sh
Executable file
@@ -0,0 +1,25 @@
|
||||
#!/bin/bash
|
||||
|
||||
set -e
|
||||
|
||||
rm -fr ./agaricus* ./*.pem ./poc
|
||||
|
||||
world_size=2
|
||||
|
||||
# Generate server and client certificates.
|
||||
openssl req -x509 -newkey rsa:2048 -days 7 -nodes -keyout server-key.pem -out server-cert.pem -subj "/C=US/CN=localhost"
|
||||
openssl req -x509 -newkey rsa:2048 -days 7 -nodes -keyout client-key.pem -out client-cert.pem -subj "/C=US/CN=localhost"
|
||||
|
||||
# Split train and test files manually to simulate a federated environment.
|
||||
split -n l/${world_size} --numeric-suffixes=1 -a 1 ../data/agaricus.txt.train agaricus.txt.train-site-
|
||||
split -n l/${world_size} --numeric-suffixes=1 -a 1 ../data/agaricus.txt.test agaricus.txt.test-site-
|
||||
|
||||
nvflare poc -n 2 --prepare
|
||||
mkdir -p /tmp/nvflare/poc/admin/transfer/horizontal-xgboost
|
||||
cp -fr config custom /tmp/nvflare/poc/admin/transfer/horizontal-xgboost
|
||||
cp server-*.pem client-cert.pem /tmp/nvflare/poc/server/
|
||||
for id in $(eval echo "{1..$world_size}"); do
|
||||
cp server-cert.pem client-*.pem /tmp/nvflare/poc/site-"$id"/
|
||||
cp agaricus.txt.train-site-"$id" /tmp/nvflare/poc/site-"$id"/agaricus.txt.train
|
||||
cp agaricus.txt.test-site-"$id" /tmp/nvflare/poc/site-"$id"/agaricus.txt.test
|
||||
done
|
||||
Reference in New Issue
Block a user