Skip to content
Open
1 change: 1 addition & 0 deletions tensorboard/webapp/metrics/effects/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ tf_ng_module(
"//tensorboard/webapp/metrics/data_source",
"//tensorboard/webapp/metrics/store",
"//tensorboard/webapp/types",
"//tensorboard/webapp/util:types",
"@npm//@angular/core",
"@npm//@ngrx/effects",
"@npm//@ngrx/store",
Expand Down
127 changes: 107 additions & 20 deletions tensorboard/webapp/metrics/effects/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,15 @@ import {forkJoin, merge, Observable, of} from 'rxjs';
import {
catchError,
throttleTime,
combineLatestWith,
filter,
map,
mergeMap,
switchMap,
take,
tap,
withLatestFrom,
shareReplay,
} from 'rxjs/operators';
import * as routingActions from '../../app_routing/actions';
import {State} from '../../app_state';
Expand All @@ -42,13 +44,16 @@ import {
TagMetadata,
TimeSeriesRequest,
TimeSeriesResponse,
NonSampledPluginType,
} from '../data_source/index';
import {
getCardLoadState,
getCardMetadata,
getMetricsTagMetadataLoadState,
TagMetadata as StoreTagMetadata,
} from '../store';
import {CardId, CardMetadata} from '../types';
import {CardId, CardMetadata, PluginType} from '../types';
import {DeepReadonly} from '../../util/types';

export type CardFetchInfo = CardMetadata & {
id: CardId;
Expand All @@ -68,6 +73,29 @@ const getCardFetchInfo = createSelector(

const initAction = createAction('[Metrics Effects] Init');

function generateMultiRunTagsToEidMapping(
tagMetadata: DeepReadonly<StoreTagMetadata>,
runToEid: Record<string, string>
): Record<string, Set<string>> {
const tagToEid: Record<string, Set<string>> = {};
for (const pluginType in tagMetadata) {
if (isSingleRunPlugin(pluginType as PluginType)) {
continue;
}

Object.entries(
tagMetadata[pluginType as NonSampledPluginType].tagToRuns
).forEach(([tag, runIds]) => {
if (!tagToEid[tag]) {
tagToEid[tag] = new Set();
}
runIds.forEach((runId) => tagToEid[tag].add(runToEid[runId]));
});
}

return tagToEid;
}

@Injectable()
export class MetricsEffects implements OnInitEffects {
constructor(
Expand All @@ -76,6 +104,42 @@ export class MetricsEffects implements OnInitEffects {
private readonly dataSource: MetricsDataSource
) {}

/**
* Computes a record of tag to the experiments it appears in.
*
* The computation is done by translating Plugin -> Tag -> Run -> ExpId
*
* There are essentially 3 kinds of plugins but here we really only care about
* Single Run vs Multi Run
* 1) Non-sampled multi-run plugins
* 2) Non-sampled single-run plugins
* 3) Sampled single-run plugins
* Note: There are no sampled multi run plugins
*
* TensorBoard generates cards for the based on the Tag -> Run relationship it
* recieves from the `/timeseries/tags` response.
*
* As these cards become visible this effect is invoked to fetch data for the
* cards using the `/timeseries/timeSeries` endpoint. One request is made for each
* kind of data being shown for each experiment being viewed.
*
* Because runs can only be associated with a single experiment only a single
* request is required for single run plugin data.
*
* Multi run plugins can contain runs from multiple experiments (if they contain
* the same tag) and thus may require up to N requests where N is the number of
* experiments. This mapping from tag to experiment id is used to ensure we do
* not make unnecessary requests fetch data for experiments without the relevant tag.
*/
readonly multiRunTagsToEid$: Observable<Record<string, Set<string>>> =
this.store.select(selectors.getMetricsTagMetadata).pipe(
combineLatestWith(this.store.select(selectors.getRunIdToExperimentId)),
map(([tagMetadata, runToEid]) => {
return generateMultiRunTagsToEidMapping(tagMetadata, runToEid);
}),
shareReplay(1)
);

/** @export */
ngrxOnInitEffects(): Action {
return initAction();
Expand Down Expand Up @@ -193,25 +257,42 @@ export class MetricsEffects implements OnInitEffects {

private fetchTimeSeriesForCards(
fetchInfos: CardFetchInfo[],
experimentIds: string[]
experimentIds: string[],
multiRunTagsToEid: Record<string, Set<string>>
) {
/**
* TODO(psybuzz): if 2 cards require the same data, we should dedupe instead of
* making 2 identical requests.
*/
const requests: TimeSeriesRequest[] = fetchInfos.map((fetchInfo) => {
const {plugin, tag, runId, sample} = fetchInfo;
const partialRequest: TimeSeriesRequest = isSingleRunPlugin(plugin)
? {plugin, tag, runId: runId!}
: {plugin, tag, experimentIds};
if (sample !== undefined) {
partialRequest.sample = sample;
}
return partialRequest;
});
const requests = fetchInfos
.map((fetchInfo) => {
const {plugin, tag, runId, sample} = fetchInfo;

if (isSingleRunPlugin(plugin)) {
if (!runId) {
return;
}
return {
plugin,
tag,
runId,
sample,
};
}

const filteredEids = experimentIds.filter((eid) =>
multiRunTagsToEid[tag]?.has(eid)
);
if (!filteredEids.length) {
return;
}

return {plugin, tag, sample, experimentIds: filteredEids};
})
.filter(Boolean);

const uniqueRequests = Array.from(
new Set(requests.map((request) => JSON.stringify(request)))
).map((serialized) => JSON.parse(serialized) as TimeSeriesRequest);

// Fetch and handle responses.
return of(requests).pipe(
return of(uniqueRequests).pipe(
tap((requests) => {
this.store.dispatch(actions.multipleTimeSeriesRequested({requests}));
}),
Expand Down Expand Up @@ -254,10 +335,15 @@ export class MetricsEffects implements OnInitEffects {
withLatestFrom(
this.store
.select(selectors.getExperimentIdsFromRoute)
.pipe(filter((experimentIds) => experimentIds !== null))
.pipe(filter((experimentIds) => experimentIds !== null)),
this.multiRunTagsToEid$
),
mergeMap(([fetchInfos, experimentIds]) => {
return this.fetchTimeSeriesForCards(fetchInfos, experimentIds!);
mergeMap(([fetchInfos, experimentIds, multiRunTagsToEid]) => {
return this.fetchTimeSeriesForCards(
fetchInfos,
experimentIds!,
multiRunTagsToEid
);
})
);

Expand Down Expand Up @@ -302,4 +388,5 @@ export class MetricsEffects implements OnInitEffects {
export const TEST_ONLY = {
getCardFetchInfo,
initAction,
generateMultiRunTagsToEidMapping,
};
Loading