diff --git a/frontend/src/components/constants.ts b/frontend/src/components/constants.ts index 172027f3e..a5f7de5eb 100644 --- a/frontend/src/components/constants.ts +++ b/frontend/src/components/constants.ts @@ -7,6 +7,7 @@ export const BUILDER_API_KEY = '4abd0efa0759420b88149ada5c1eb216'; // By default, most feature flags will be false when there's no embedded mode on. export const FEATURE_FLAGS = { enableKnowledgeBaseInConsoleUi: false, + enableNewPipelineLogs: false, enableRemoteMcpInConsole: false, enableRemoteMcpInspectorInConsole: false, enableRemoteMcpConnectClientInConsoleServerless: false, diff --git a/frontend/src/components/pages/rp-connect/pipeline/list.tsx b/frontend/src/components/pages/rp-connect/pipeline/list.tsx index 65b415ac6..e7f7624c1 100644 --- a/frontend/src/components/pages/rp-connect/pipeline/list.tsx +++ b/frontend/src/components/pages/rp-connect/pipeline/list.tsx @@ -12,72 +12,178 @@ import { create } from '@bufbuild/protobuf'; import { ConnectError } from '@connectrpc/connect'; import { Link as TanStackRouterLink, useNavigate } from '@tanstack/react-router'; -import type { ColumnDef, VisibilityState } from '@tanstack/react-table'; -import { flexRender, getCoreRowModel, getPaginationRowModel, useReactTable } from '@tanstack/react-table'; +import { + type ColumnDef, + type ColumnFiltersState, + flexRender, + getCoreRowModel, + getFacetedRowModel, + getFacetedUniqueValues, + getFilteredRowModel, + getPaginationRowModel, + type Table as TanStackTable, + useReactTable, +} from '@tanstack/react-table'; +import { Badge } from 'components/redpanda-ui/components/badge'; import { Button } from 'components/redpanda-ui/components/button'; -import { DataTablePagination } from 'components/redpanda-ui/components/data-table'; +import { DataTableFacetedFilter, DataTablePagination } from 'components/redpanda-ui/components/data-table'; +import { + DropdownMenu, + DropdownMenuContent, + DropdownMenuItem, + DropdownMenuSeparator, + DropdownMenuTrigger, +} from 'components/redpanda-ui/components/dropdown-menu'; +import { Input, InputEnd } from 'components/redpanda-ui/components/input'; +import { Label } from 'components/redpanda-ui/components/label'; import { Skeleton } from 'components/redpanda-ui/components/skeleton'; import { Spinner } from 'components/redpanda-ui/components/spinner'; import { Table, TableBody, TableCell, TableHead, TableHeader, TableRow } from 'components/redpanda-ui/components/table'; import { Tabs, TabsContent, TabsContents, TabsList, TabsTrigger } from 'components/redpanda-ui/components/tabs'; import { Link, Text } from 'components/redpanda-ui/components/typography'; import { DeleteResourceAlertDialog } from 'components/ui/delete-resource-alert-dialog'; -import { AlertCircle, Trash2 } from 'lucide-react'; -import { DeletePipelineRequestSchema } from 'protogen/redpanda/api/console/v1alpha1/pipeline_pb'; -import type { Pipeline as APIPipeline, Pipeline_State } from 'protogen/redpanda/api/dataplane/v1/pipeline_pb'; -import { useCallback, useMemo, useState } from 'react'; +import { isFeatureFlagEnabled } from 'config'; +import { AlertCircle, MoreHorizontal, SearchIcon } from 'lucide-react'; +import { + DeletePipelineRequestSchema, + StartPipelineRequestSchema, + StopPipelineRequestSchema, +} from 'protogen/redpanda/api/console/v1alpha1/pipeline_pb'; +import { type Pipeline as APIPipeline, Pipeline_State } from 'protogen/redpanda/api/dataplane/v1/pipeline_pb'; +import { memo, useCallback, useMemo, useState } from 'react'; import { useKafkaConnectConnectorsQuery } from 'react-query/api/kafka-connect'; -import { useDeletePipelineMutation, useListPipelinesQuery } from 'react-query/api/pipeline'; +import { + useDeletePipelineMutation, + useListPipelinesQuery, + useStartPipelineMutation, + useStopPipelineMutation, +} from 'react-query/api/pipeline'; +import { type PipelineLogCounts, useStreamingPipelineLogCounts } from 'react-query/api/pipeline-messages'; import { toast } from 'sonner'; import { useResetOnboardingWizardStore } from 'state/onboarding-wizard-store'; import { formatToastErrorMessageGRPC } from 'utils/toast.utils'; +import { parse as parseYaml } from 'yaml'; -import { PipelineStatusBadge } from './status-badge'; +import { PIPELINE_STATE_OPTIONS, STARTABLE_STATES, STOPPABLE_STATES } from '../../../ui/pipeline/constants'; +import { PipelineLogIndicator } from '../../../ui/pipeline/pipeline-log-indicator'; +import { PipelineStatusBadge } from '../../../ui/pipeline/status-badge'; import { TabKafkaConnect } from '../../connect/overview'; -type Pipeline = { +// ============================================================================ +// Types +// ============================================================================ + +type BasePipeline = { id: string; name: string; description: string; state: Pipeline_State; + configYaml: string; + input?: string; + output?: string; +}; + +/** + * Pipeline data enriched with streaming log counts. + * This is the row data type used by TanStack Table. + */ +type Pipeline = BasePipeline & { + logCounts?: PipelineLogCounts; +}; + +type ParsedYamlConfig = { + input?: Record; + output?: Record; +}; + +const parseInputOutput = (configYaml: string): { input?: string; output?: string } => { + if (!configYaml) { + return {}; + } + try { + const config = parseYaml(configYaml) as ParsedYamlConfig | null; + if (!config) { + return {}; + } + + const inputObj = config.input; + const outputObj = config.output; + + return { + input: inputObj && typeof inputObj === 'object' ? Object.keys(inputObj)[0] : undefined, + output: outputObj && typeof outputObj === 'object' ? Object.keys(outputObj)[0] : undefined, + }; + } catch { + return {}; + } +}; + +const transformAPIPipeline = (apiPipeline: APIPipeline): BasePipeline => { + const { input, output } = parseInputOutput(apiPipeline.configYaml); + return { + id: apiPipeline.id, + name: apiPipeline.displayName, + description: apiPipeline.description, + state: apiPipeline.state, + configYaml: apiPipeline.configYaml, + input, + output, + }; }; -const transformAPIPipeline = (apiPipeline: APIPipeline): Pipeline => ({ - id: apiPipeline.id, - name: apiPipeline.displayName, - description: apiPipeline.description, - state: apiPipeline.state, -}); +// ============================================================================ +// Pagination Constants +// ============================================================================ -const PipelineTableSkeleton = () => ( +const PAGE_SIZE = 20; + +// ============================================================================ +// Skeleton Component +// ============================================================================ + +const PipelineListSkeleton = () => (
-
- +
+ +
- ID - Pipeline Name - Description - State - + + + + + + + + + + + + + + + {Array.from({ length: 5 }).map(() => ( - +
+ + +
- + - + - + @@ -89,16 +195,328 @@ const PipelineTableSkeleton = () => ( ); +// ============================================================================ +// Actions Cell - Pipeline row actions dropdown +// ============================================================================ + +type ActionsCellProps = { + pipeline: Pipeline; + navigate: ReturnType; + deleteMutation: ReturnType['mutate']; + startMutation: ReturnType['mutate']; + stopMutation: ReturnType['mutate']; + isDeletingPipeline: boolean; +}; + +const ActionsCell = memo( + ({ pipeline, navigate, deleteMutation, startMutation, stopMutation, isDeletingPipeline }: ActionsCellProps) => { + const canStart = (STARTABLE_STATES as readonly Pipeline_State[]).includes(pipeline.state); + const canStop = (STOPPABLE_STATES as readonly Pipeline_State[]).includes(pipeline.state); + const isStarting = pipeline.state === Pipeline_State.STARTING; + const isStopping = pipeline.state === Pipeline_State.STOPPING; + + const handleStart = () => { + const startRequest = create(StartPipelineRequestSchema, { + request: { id: pipeline.id }, + }); + startMutation(startRequest, { + onSuccess: () => { + toast.success('Pipeline started'); + }, + onError: (err) => { + toast.error( + formatToastErrorMessageGRPC({ + error: ConnectError.from(err), + action: 'start', + entity: 'pipeline', + }) + ); + }, + }); + }; + + const handleStop = () => { + const stopRequest = create(StopPipelineRequestSchema, { + request: { id: pipeline.id }, + }); + stopMutation(stopRequest, { + onSuccess: () => { + toast.success('Pipeline stopped'); + }, + onError: (err) => { + toast.error( + formatToastErrorMessageGRPC({ + error: ConnectError.from(err), + action: 'stop', + entity: 'pipeline', + }) + ); + }, + }); + }; + + const handleDelete = (id: string) => { + const deleteRequest = create(DeletePipelineRequestSchema, { + request: { id }, + }); + + deleteMutation(deleteRequest, { + onSuccess: () => { + toast.success('Pipeline deleted'); + }, + onError: (err) => { + toast.error( + formatToastErrorMessageGRPC({ + error: ConnectError.from(err), + action: 'delete', + entity: 'pipeline', + }) + ); + }, + }); + }; + + return ( +
+ + + + + + + navigate({ + to: '/rp-connect/$pipelineId/edit', + params: { pipelineId: encodeURIComponent(pipeline.id) }, + }) + } + > + Edit + + {isStarting ? Retry start : null} + {isStopping ? Retry stop : null} + {canStart ? Start : null} + {isStopping ? Start : null} + {canStop ? Stop : null} + + + + +
+ ); + } +); + +ActionsCell.displayName = 'ActionsCell'; + +// ============================================================================ +// Toolbar Component - Filters +// ============================================================================ + +type PipelineListToolbarProps = { + table: TanStackTable; + inputOptions: { label: string; value: string }[]; + outputOptions: { label: string; value: string }[]; +}; + +const PipelineListToolbar = ({ table, inputOptions, outputOptions }: PipelineListToolbarProps) => { + const isFiltered = table.getState().columnFilters.length > 0; + + return ( +
+
+
+ + ) => + table.getColumn('name')?.setFilterValue(event.target.value) + } + size="sm" + value={(table.getColumn('name')?.getFilterValue() as string) ?? ''} + > + + + + +
+
+ +
+ {inputOptions.length > 0 && table.getColumn('input') && ( + + )} + {outputOptions.length > 0 && table.getColumn('output') && ( + + )} + {table.getColumn('state') && ( + + )} + {isFiltered ? ( + + ) : null} +
+
+
+
+ ); +}; + +// ============================================================================ +// Table Columns +// ============================================================================ + +type CreateColumnsOptions = { + navigate: ReturnType; + deleteMutation: ReturnType['mutate']; + startMutation: ReturnType['mutate']; + stopMutation: ReturnType['mutate']; + isDeletingPipeline: boolean; + isNewPipelineLogsEnabled: boolean; +}; + +const createColumns = ({ + navigate, + deleteMutation, + startMutation, + stopMutation, + isDeletingPipeline, + isNewPipelineLogsEnabled, +}: CreateColumnsOptions): ColumnDef[] => [ + { + accessorKey: 'name', + header: 'Pipeline Name', + cell: ({ row }) => ( +
+ + {row.getValue('name')} + + {isNewPipelineLogsEnabled ? : null} +
+ ), + filterFn: (row, _id, filterValue: string) => { + const searchLower = filterValue.toLowerCase(); + return ( + row.original.name.toLowerCase().includes(searchLower) || + row.original.id.toLowerCase().includes(searchLower) || + row.original.description.toLowerCase().includes(searchLower) + ); + }, + }, + { + accessorKey: 'input', + header: 'Input', + cell: ({ row }) => { + const input = row.getValue('input') as string | undefined; + return ( +
+ {input ? {input} : null} + {isNewPipelineLogsEnabled ? : null} +
+ ); + }, + filterFn: (row, _id, filterValue: string[]) => { + if (filterValue.length === 0) { + return true; + } + return Boolean(row.original.input && filterValue.includes(row.original.input)); + }, + }, + { + accessorKey: 'output', + header: 'Output', + cell: ({ row }) => { + const output = row.getValue('output') as string | undefined; + return ( +
+ {output ? {output} : null} + {isNewPipelineLogsEnabled ? : null} +
+ ); + }, + filterFn: (row, _id, filterValue: string[]) => { + if (filterValue.length === 0) { + return true; + } + return Boolean(row.original.output && filterValue.includes(row.original.output)); + }, + }, + { + accessorKey: 'state', + header: 'Status', + cell: ({ row }) => , + filterFn: (row, _id, filterValue: string[]) => { + if (filterValue.length === 0) { + return true; + } + return filterValue.includes(String(row.original.state)); + }, + }, + { + id: 'actions', + enableHiding: false, + cell: ({ row }) => ( + + ), + }, +]; + +// ============================================================================ +// Main Pipeline List Component +// ============================================================================ + const PipelineListPageContent = () => { const navigate = useNavigate(); const resetOnboardingWizardStore = useResetOnboardingWizardStore(); - const [columnVisibility, setColumnVisibility] = useState({}); - const [rowSelection, setRowSelection] = useState({}); - const { data: pipelinesData, isLoading, error } = useListPipelinesQuery(); + // Table state + const [columnFilters, setColumnFilters] = useState([]); + + const { + data: pipelinesData, + isLoading, + error, + } = useListPipelinesQuery(undefined, { + enableSmartPolling: true, + }); const { mutate: deleteMutation, isPending: isDeletingPipeline } = useDeletePipelineMutation(); + const { mutate: startMutation } = useStartPipelineMutation(); + const { mutate: stopMutation } = useStopPipelineMutation(); + const isNewPipelineLogsEnabled = isFeatureFlagEnabled('enableNewPipelineLogs'); - const pipelines = useMemo( + // Transform API data to base pipeline objects + const basePipelines = useMemo( () => (pipelinesData?.pipelines || []) .filter( @@ -108,125 +526,96 @@ const PipelineListPageContent = () => { [pipelinesData] ); - const handleCreateClick = useCallback(() => { - resetOnboardingWizardStore(); - navigate({ - to: '/rp-connect/wizard', - search: { step: undefined, serverless: undefined }, - }); - }, [resetOnboardingWizardStore, navigate]); + // Get all pipeline IDs for streaming log counts + const allPipelineIds = useMemo(() => basePipelines.map((p) => p.id), [basePipelines]); + const { counts } = useStreamingPipelineLogCounts(allPipelineIds, isNewPipelineLogsEnabled); - const columns = useMemo[]>( - () => [ - { - accessorKey: 'id', - header: 'ID', - cell: ({ row }) => ( - - {row.getValue('id')} - - ), - enableSorting: false, - size: 120, - }, - { - accessorKey: 'name', - header: 'Pipeline Name', - cell: ({ row }) => ( - - {row.getValue('name')} - - ), - size: 250, - }, - { - accessorKey: 'description', - header: 'Description', - cell: ({ row }) => {row.getValue('description')}, - enableSorting: false, - size: 400, - }, - { - accessorKey: 'state', - header: 'State', - cell: ({ row }) => , - size: 120, - }, - { - id: 'actions', - enableHiding: false, - cell: ({ row }) => { - const handleDelete = (id: string) => { - const deleteRequest = create(DeletePipelineRequestSchema, { - request: { id }, - }); - - deleteMutation(deleteRequest, { - onSuccess: () => { - toast.success('Pipeline deleted'); - }, - onError: (err) => { - toast.error( - formatToastErrorMessageGRPC({ - error: ConnectError.from(err), - action: 'delete', - entity: 'pipeline', - }) - ); - }, - }); - }; - - return ( -
- } - buttonText={undefined} - buttonVariant="destructive-ghost" - isDeleting={isDeletingPipeline} - onDelete={handleDelete} - resourceId={row.original.id} - resourceName={row.original.name} - resourceType="Pipeline" - triggerVariant="button" - /> -
- ); - }, - size: 60, - }, - ], - [deleteMutation, isDeletingPipeline] + // Enrich pipelines with log counts - this is the table's data source + const pipelines: Pipeline[] = useMemo( + () => basePipelines.map((p) => ({ ...p, logCounts: counts.get(p.id) })), + [basePipelines, counts] + ); + + // Memoize columns to avoid re-renders + const columns = useMemo( + () => + createColumns({ + navigate, + deleteMutation, + startMutation, + stopMutation, + isDeletingPipeline, + isNewPipelineLogsEnabled, + }), + [navigate, deleteMutation, startMutation, stopMutation, isDeletingPipeline, isNewPipelineLogsEnabled] ); const table = useReactTable({ data: pipelines, columns, + onColumnFiltersChange: setColumnFilters, getCoreRowModel: getCoreRowModel(), getPaginationRowModel: getPaginationRowModel(), - onColumnVisibilityChange: setColumnVisibility, - onRowSelectionChange: setRowSelection, + getFilteredRowModel: getFilteredRowModel(), + getFacetedRowModel: getFacetedRowModel(), + getFacetedUniqueValues: getFacetedUniqueValues(), initialState: { pagination: { - pageSize: 20, + pageSize: PAGE_SIZE, }, }, state: { - columnVisibility, - rowSelection, + columnFilters, }, }); + const handleCreateClick = useCallback(() => { + resetOnboardingWizardStore(); + navigate({ + to: '/rp-connect/wizard', + search: { step: undefined, serverless: undefined }, + }); + }, [resetOnboardingWizardStore, navigate]); + + // Generate filter options from pipeline data + const inputOptions = useMemo(() => { + const uniqueInputs = new Set(); + for (const pipeline of pipelines) { + if (pipeline.input) { + uniqueInputs.add(pipeline.input); + } + } + return Array.from(uniqueInputs) + .sort() + .map((input) => ({ label: input, value: input })); + }, [pipelines]); + + const outputOptions = useMemo(() => { + const uniqueOutputs = new Set(); + for (const pipeline of pipelines) { + if (pipeline.output) { + uniqueOutputs.add(pipeline.output); + } + } + return Array.from(uniqueOutputs) + .sort() + .map((output) => ({ + label: output, + value: output, + })); + }, [pipelines]); + if (isLoading) { - return ; + return ; + } + + if (error) { + return ( +
+ + Error loading pipelines: {error.message} +
+ ); } return ( @@ -234,6 +623,7 @@ const PipelineListPageContent = () => {
+
{table.getHeaderGroups().map((headerGroup) => ( @@ -248,46 +638,45 @@ const PipelineListPageContent = () => { {(() => { - if (isLoading) { - return ; - } - if (error) { + const rows = table.getRowModel().rows; + if (rows.length === 0) { return ( -
- - Error loading pipelines: {error.message} -
+ {basePipelines.length === 0 + ? 'You have no Redpanda Connect pipelines' + : 'No pipelines match your filters'}
); } - if (table.getRowModel().rows?.length) { - return table.getRowModel().rows.map((row) => ( - - {row.getVisibleCells().map((cell) => ( - {flexRender(cell.column.columnDef.cell, cell.getContext())} - ))} - - )); - } - return ( - - - You have no Redpanda Connect pipelines - + return rows.map((row) => ( + + {row.getVisibleCells().map((cell) => ( + {flexRender(cell.column.columnDef.cell, cell.getContext())} + ))} - ); + )); })()}
- +
+ {isNewPipelineLogsEnabled ? ( + + † Log counts are derived from a sample of recent pipeline logs and may not reflect the complete historical + data. + + ) : null} + +
); }; -// Separate component to prevent unnecessary remounting +// ============================================================================ +// Page Wrapper Components +// ============================================================================ + const RedpandaConnectContent = () => (
@@ -303,19 +692,11 @@ const RedpandaConnectContent = () => ( ); export const PipelineListPage = () => { - // Don't block on Kafka Connect query - it's optional and additive - // If query fails or is loading, we just don't show the Kafka Connect tab (fail gracefully) const { data: kafkaConnectors, isLoading: isLoadingKafkaConnect } = useKafkaConnectConnectorsQuery(); - // Only show Kafka Connect tab if it's explicitly configured - // This check is intentionally strict (=== true) to avoid showing the tab on errors/undefined const isKafkaConnectEnabled = kafkaConnectors?.isConfigured === true; - - // Show that we're checking for Kafka Connect in the background - // This is subtle and doesn't block the user experience const showKafkaConnectLoadingHint = isLoadingKafkaConnect && !kafkaConnectors; - // If Kafka Connect is not enabled, just show Redpanda Connect content if (!isKafkaConnectEnabled) { return (
@@ -330,7 +711,6 @@ export const PipelineListPage = () => { ); } - // If Kafka Connect is enabled, show tabs with intro text return (
diff --git a/frontend/src/components/pages/rp-connect/pipeline/status-badge.tsx b/frontend/src/components/pages/rp-connect/pipeline/status-badge.tsx deleted file mode 100644 index c0d5f41ad..000000000 --- a/frontend/src/components/pages/rp-connect/pipeline/status-badge.tsx +++ /dev/null @@ -1,59 +0,0 @@ -import { Badge } from 'components/redpanda-ui/components/badge'; -import { AlertCircle, Check, Loader2, Pause } from 'lucide-react'; -import { Pipeline_State } from 'protogen/redpanda/api/dataplane/v1/pipeline_pb'; -import { useMemo } from 'react'; - -export const PipelineStatusBadge = ({ state }: { state?: Pipeline_State }) => { - const statusConfig = useMemo(() => { - switch (state) { - case Pipeline_State.RUNNING: - return { - variant: 'success-inverted' as const, - icon: , - text: 'Running', - }; - case Pipeline_State.STARTING: - return { - variant: 'info-inverted' as const, - icon: , - text: 'Starting', - }; - case Pipeline_State.STOPPING: - return { - variant: 'warning-inverted' as const, - icon: , - text: 'Stopping', - }; - case Pipeline_State.STOPPED: - return { - variant: 'neutral-inverted' as const, - icon: , - text: 'Stopped', - }; - case Pipeline_State.COMPLETED: - return { - variant: 'success-inverted' as const, - icon: , - text: 'Completed', - }; - case Pipeline_State.ERROR: - return { - variant: 'destructive-inverted' as const, - icon: , - text: 'Error', - }; - default: - return { - variant: 'neutral-inverted' as const, - icon: , - text: 'Unknown', - }; - } - }, [state]); - - return ( - - {statusConfig.text} - - ); -}; diff --git a/frontend/src/components/redpanda-ui/components/data-table.tsx b/frontend/src/components/redpanda-ui/components/data-table.tsx index 49ef8abb5..6b8dce228 100644 --- a/frontend/src/components/redpanda-ui/components/data-table.tsx +++ b/frontend/src/components/redpanda-ui/components/data-table.tsx @@ -6,6 +6,7 @@ import { ArrowRight, ArrowUp, CheckCircle, + ChevronDown, ChevronLeft, ChevronRight, ChevronsLeft, @@ -14,7 +15,6 @@ import { Circle, CircleOff, EyeOff, - FilterIcon, HelpCircle, MoreHorizontal, Settings2, @@ -282,7 +282,7 @@ export function DataTableFacetedFilter({ return ( - - + @@ -380,9 +380,15 @@ export function DataTableFacetedFilter({ interface DataTablePaginationProps { table: Table; testId?: string; + /** Hide the page size selector. Useful when page size should be fixed. */ + hidePageSizeSelector?: boolean; } -export function DataTablePagination({ table, testId }: DataTablePaginationProps) { +export function DataTablePagination({ + table, + testId, + hidePageSizeSelector = false, +}: DataTablePaginationProps) { return (
@@ -391,26 +397,28 @@ export function DataTablePagination({ table, testId }: DataTablePaginatio Page {table.getState().pagination.pageIndex + 1} of {table.getPageCount()}
)} -
-

Rows per page

- -
+ {!hidePageSizeSelector && ( +
+

Rows per page

+ +
+ )}