Cluster container deployment
1. Test container cluster network
First ensure that the two server networks can access each other
yum install telnet -y telnet ip port
Start the container in host mode on both sides, and share a network namespace with the host
docker run --net=host -itd secretflow/secretflow-anolis8:0.7.11b2
Test whether the port in the container of server A can be accessed by the container of server B
- Start the main node ray cluster service in the A server container first, and then start the child node ray cluster in the B server container
- If the master node can be connected normally, the network is normal. If the connection fails, check whether the port is normal and whether the communication is normal
- The master node starts the ray service
RAY_DISABLE_REMOTE_CODE=true \ RAY_USE_TLS=0 \ ray start --head --node-ip-address="host machine ip" --port="GCS server listening port" --resources='{"alice": 8}' --include-dashboard=False --disable-usage-stats # RAY_USE_TLS 0 turn off tls verification # {"alice": 8} means that alice can run up to 8 worker s at the same time # RAY_DISABLE_REMOTE_CODE=true \ RAY_USE_TLS=0 \ ray start --head --node-ip-address="10.10.10.111" --port="9937" --resources='{"alice": 8}' --include-dashboard=False --disable-usage-stats
- The child node starts the ray service
RAY_DISABLE_REMOTE_CODE=true \ RAY_USE_TLS=0 \ ray start --address="master node ip:master node GCS_port" --resources='{"bob": 8}' --disable-usage-stats # example RAY_DISABLE_REMOTE_CODE=true \ RAY_USE_TLS=0 \ ray start --address="10.10.10.111:9937" --resources='{"bob": 8}' --disable-usage-stats
- Check the node startup status, ray status, whether the hash es of the synchronized nodes on both sides are consistent, and whether the service is normal
ray status
Optionally start jupyter startup
- start jupyter:
jupyter notebook --ip 0.0.0.0 --allow-root --port 9910
- Run jupyter in the background:
nohup jupyter notebook --ip 0.0.0.0 --allow-root --port 9922 > jupyter.log 2>&1 &
2. Test the BRPC communication port in the container
In the lingo framework, SPU is based on Brpc, which means that SPU has a service mesh independent of the Ray network. In other words, you have to deal with the ports of the SPU separately
Before testing, first test whether the Brpc port is normal, and start the Brpc service on one of them
import spu.binding._lib.link as spu_link rank = 0 node = { 'party': 'alice', 'id': 'local:0', 'address': '10.10.10.111:9001', # The listen address of this node } desc = spu_link.Desc() desc.add_party(node['id'], node['address']) link = spu_link.create_brpc(desc, rank)
The port status of accessing the other party in the container of the other party, skip if it is normal
telnet ip port
3. Verify that the node is started
Check the node startup status, ray status, whether the hash es of the synchronized nodes on both sides are consistent, and whether the service is normal
ray status
To test whether the node starts successfully in python, select any machine and enter python, execute the following code, the address in the parameter is the address of the head node (alice), use the alice machine to verify, enter the following code for each line and press Enter once:
import secretflow as sf sf.init(address='10.10.10.111:9937') alice = sf.PYU('alice') bob = sf.PYU('bob') sf.reveal(alice(lambda x : x)(2)) sf.reveal(bob(lambda x : x)(2))
The statement PYU just defines the machine alice
alice = sf.PYU('alice')
When using alice to call its own call method, ray will call the machine of alice. In the cluster environment, if you want to start PYU, generate the corresponding environment in advance
sf.reveal(alice(lambda x : x)(2))
[External link picture transfer failed, the source site may have an anti-leeching mechanism, it is recommended to save the picture and upload it directly (img-G12cC857-1672211277310)(resource/img.png)]
4. Logistic regression test
The spu initialization can specify that the computing node is three-party or two-party. According to the agreement, the three-party computing node needs three machines.
- Computing node: Two or three machines for SPU computing can be fixed as computing nodes
- Data nodes: Data nodes can be computing nodes or other partner nodes. There can be N nodes of the data provider, which will eventually be split into share s for calculation
import numpy as np from sklearn.datasets import load_breast_cancer from sklearn.model_selection import train_test_split from sklearn.preprocessing import Normalizer import secretflow as sf import jax.numpy as jnp import matplotlib.pyplot as plt from sklearn.metrics import roc_auc_score import socket from contextlib import closing from typing import List, Tuple, cast import spu def unused_tcp_port() -> int: """Return an unused port""" with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as sock: sock.bind(("", 0)) sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) return cast(int, sock.getsockname()[1]) # In the case of a tripartite agreement, three nodes can be started for calculation aby3_cluster_def = { 'nodes': [ { 'party': 'alice', 'id': 'local:0', 'address': f'127.0.0.1:{unused_tcp_port()}', }, {'party': 'bob', 'id': 'local:1', 'address': f'127.0.0.1:{unused_tcp_port()}'}, { 'party': 'carol', 'id': 'local:2', 'address': f'127.0.0.1:{unused_tcp_port()}', }, ], 'runtime_config': { 'protocol': spu.spu_pb2.ABY3, 'field': spu.spu_pb2.FM64, 'enable_pphlo_profile': False, 'enable_hal_profile': False, 'enable_pphlo_trace': False, 'enable_action_trace': False, }, } semi2k_cluster_def = { 'nodes': [ { 'party': 'alice', 'id': 'alice:0', 'address': f'10.10.10.111:9938', }, { 'party': 'bob', 'id': 'bob:1', 'address': f'10.10.10.115:9938'}, ], 'runtime_config': { 'protocol': spu.spu_pb2.SEMI2K, 'field': spu.spu_pb2.FM128, 'enable_pphlo_profile': False, 'enable_hal_profile': False, 'enable_pphlo_trace': False, 'enable_action_trace': False, }, } def breast_cancer(party_id=None, train: bool = True) -> (np.ndarray, np.ndarray): scaler = Normalizer(norm='max') x, y = load_breast_cancer(return_X_y=True) x = scaler.fit_transform(x) x_train, x_test, y_train, y_test = train_test_split( x, y, test_size=0.2, random_state=42 ) if train: if party_id: if party_id == 1: return x_train[:, 15:], None else: return x_train[:, :15], y_train else: return x_train, y_train else: return x_test, y_test # In case you have a running secretflow runtime already. sf.shutdown() sf.init(address='10.10.10.111:9937', log_to_driver=True) alice, bob = sf.PYU('alice'), sf.PYU('bob') spu = sf.SPU(cluster_def=semi2k_cluster_def) # spu = sf.SPU(cluster_def=aby3_cluster_def) x1, _ = alice(breast_cancer)(party_id=1) x2, y = bob(breast_cancer)(party_id=2) device = spu W = jnp.zeros((30,)) b = 0.0 W_, b_, x1_, x2_, y_ = ( sf.to(device, W), sf.to(device, b), x1.to(device), x2.to(device), y.to(device), ) from jax import value_and_grad def sigmoid(x): return 1 / (1 + jnp.exp(-x)) # Outputs probability of a label being true. def predict(W, b, inputs): return sigmoid(jnp.dot(inputs, W) + b) # Training loss is the negative log-likelihood of the training examples. def loss(W, b, inputs, targets): preds = predict(W, b, inputs) label_probs = preds * targets + (1 - preds) * (1 - targets) return -jnp.mean(jnp.log(label_probs)) def train_step(W, b, x1, x2, y, learning_rate): x = jnp.concatenate([x1, x2], axis=1) loss_value, Wb_grad = value_and_grad(loss, (0, 1))(W, b, x, y) W -= learning_rate * Wb_grad[0] b -= learning_rate * Wb_grad[1] return loss_value, W, b def fit(W, b, x1, x2, y, epochs=1, learning_rate=1e-2): losses = jnp.array([]) for _ in range(epochs): l, W, b = train_step(W, b, x1, x2, y, learning_rate=learning_rate) losses = jnp.append(losses, l) return losses, W, b def plot_losses(losses): plt.plot(np.arange(len(losses)), losses) plt.xlabel('epoch') plt.ylabel('loss') def validate_model(W, b, X_test, y_test): y_pred = predict(W, b, X_test) return roc_auc_score(y_test, y_pred) losses, W_, b_ = device( fit, static_argnames=['epochs'], num_returns_policy=sf.device.SPUCompilerNumReturnsPolicy.FROM_USER, user_specified_num_returns=3, )(W_, b_, x1_, x2_, y_, epochs=10, learning_rate=1e-2) losses = sf.reveal(losses) plot_losses(losses) X_test, y_test = breast_cancer(train=False) auc = validate_model(sf.reveal(W_), sf.reveal(b_), X_test, y_test) print(f'auc={auc}') plt.show()