[dask] Small cleanup. (#6391)

This commit is contained in:
Jiaming Yuan 2020-11-14 22:15:05 +08:00 committed by GitHub
parent 4ccf92ea34
commit fcd6fad822
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -347,12 +347,11 @@ class DaskDMatrix:
def _get_worker_parts_ordered(meta_names, list_of_keys, list_of_parts, partition_order):
# List of partitions like: [(x3, y3, w3, m3, ..), ..], order is not preserved.
assert isinstance(list_of_parts, list)
list_of_parts_value = list_of_parts
result = []
for i, _ in enumerate(list_of_parts):
data = list_of_parts_value[i][0]
data = list_of_parts[i][0]
labels = None
weights = None
base_margin = None
@ -360,7 +359,7 @@ def _get_worker_parts_ordered(meta_names, list_of_keys, list_of_parts, partition
label_upper_bound = None
# Iterate through all possible meta info, brings small overhead as in xgboost
# there are constant number of meta info available.
for j, blob in enumerate(list_of_parts_value[i][1:]):
for j, blob in enumerate(list_of_parts[i][1:]):
if meta_names[j] == 'labels':
labels = blob
elif meta_names[j] == 'weights':
@ -701,7 +700,8 @@ async def _train_async(client,
dtrain.create_fn_args(workers[i]),
id(dtrain),
evals_per_worker,
pure=False)
pure=False,
workers=[worker_addr])
futures.append(f)
results = await client.gather(futures)