Skip to content

Commit 3fab5ed

Browse files
worker messaging implemented
1 parent 50aa46b commit 3fab5ed

File tree

3 files changed

+57
-64
lines changed

3 files changed

+57
-64
lines changed
File renamed without changes.

src/saga/index.js

+3-3
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,13 @@
11
import { fork, all } from 'redux-saga/effects';
22
import authSaga from './auth-saga';
33
import itemsSaga from './items-saga';
4-
import { watchActionMessage, watchWorkerMessages } from './search-worker-saga';
4+
import { workerRecieverSaga, workerSenderSaga } from './search-worker-saga';
55

66
export default function* rootSaga() {
77
yield all([
88
fork(authSaga),
99
fork(itemsSaga),
10-
fork(watchActionMessage),
11-
fork(watchWorkerMessages),
10+
fork(workerRecieverSaga),
11+
fork(workerSenderSaga),
1212
]);
1313
}

src/saga/search-worker-saga.js

+54-61
Original file line numberDiff line numberDiff line change
@@ -1,75 +1,68 @@
1-
import { take, actionChannel, call, fork, put, apply } from 'redux-saga/effects'
2-
import { searchRequest, getSearchResultSuccess } from '../reducers/items-reducer'
1+
// import { take, call, eventChannel, put } from "redux-saga"
2+
import { take, call, put, takeLatest} from 'redux-saga/effects';
3+
import { searchRequest } from '../reducers/items-reducer'
34
import { eventChannel } from 'redux-saga';
4-
let searchWorker;
5-
function* handleRequest(payload) {
6-
yield call(searchWorker.postMessage(payload))
7-
}
8-
// this function creates an event channel from a given socket
9-
// Setup subscription to incoming `ping` events
10-
function createSocketChannel(searchWorker) {
11-
// `eventChannel` takes a subscriber function
12-
// the subscriber function takes an `emit` argument to put messages onto the channel
13-
return eventChannel(emit => {
5+
const wsUrl = './workers/search-worker.js'
146

15-
const pingHandler = (event) => {
16-
// puts event payload into the channel
17-
// this allows a Saga to take this payload from the returned channel
18-
console.log(event.payload, 'Recieved')
19-
emit(event.payload)
20-
}
7+
const ws = new Worker(wsUrl)
218

22-
const errorHandler = (errorEvent) => {
23-
// create an Error object and put it into the channel
24-
emit(new Error(errorEvent.reason))
25-
}
269

27-
// setup the subscription
28-
searchWorker.onmessage = pingHandler;
29-
searchWorker.onerror = errorHandler;
10+
export const post = (msg) => ws.postMessage(msg);
11+
3012

31-
// the subscriber must return an unsubscribe function
32-
// this will be invoked when the saga calls `channel.close` method
33-
const unsubscribe = () => {
34-
searchWorker.terminate();
13+
function initWebsocket() {
14+
return eventChannel(emitter => {
15+
ws.onerror = (error) => {
16+
console.log('Worker error ' + error)
17+
console.dir(error)
3518
}
19+
ws.onmessage = (e) => {
20+
let msg = null
21+
try {
22+
console.error(e.data)
23+
msg = JSON.parse(e.data)
3624

37-
return unsubscribe
25+
} catch (e) {
26+
console.error(`Error parsing : ${e.data}`)
27+
}
28+
if (msg) {
29+
const { payload: book } = msg
30+
const channel = msg.channel
31+
switch (channel) {
32+
case 'ADD_BOOK':
33+
return emitter({ type: 'ADD_BOOK', book })
34+
case 'REMOVE_BOOK':
35+
return emitter({ type: 'REMOVE_BOOK', book })
36+
default:
37+
// nothing to do
38+
}
39+
}
40+
}
41+
// unsubscribe function
42+
return () => {
43+
ws.terminate()
44+
console.log('Socket off')
45+
}
3846
})
3947
}
40-
// reply with a `pong` message by invoking `socket.emit('pong')`
41-
function* pong(socket) {
42-
yield apply(socket, socket.emit, ['pong']) // call `emit` as a method with `socket` as context
43-
}
44-
const createConnection = () => {
45-
searchWorker = new Worker('../workers/search-worker.js');
46-
return searchWorker;
47-
}
48-
export function* watchWorkerMessages() {
49-
const socket = yield call(createConnection)
50-
const socketChannel = yield call(createSocketChannel, socket)
48+
function* doSendToWorker(action) {
49+
try {
50+
const res = yield call(post, action.payload)
51+
yield put({ type: 'POSTED', res })
52+
} catch (e) {
53+
console.log(e, 'Error');
5154

52-
while (true) {
53-
try {
54-
// An error from socketChannel will cause the saga jump to the catch block
55-
const payload = yield take(socketChannel)
56-
yield put({ type: getSearchResultSuccess.type, payload })
57-
yield fork(pong, socket)
58-
} catch (err) {
59-
console.error('socket error:', err)
60-
// socketChannel is still open in catch block
61-
// if we want end the socketChannel, we need close it explicitly
62-
// socketChannel.close()
63-
}
55+
yield put({ type: 'ERROR', error: e.json })
6456
}
6557
}
66-
export function* watchActionMessage() {
67-
// 1- Create a channel for request actions
68-
const requestChan = yield actionChannel(searchRequest.type)
58+
59+
export function* workerRecieverSaga() {
60+
const channel = yield call(initWebsocket)
6961
while (true) {
70-
// 2- take from the channel
71-
const { payload } = yield take(requestChan)
72-
// 3- Note that we're using a blocking call
73-
yield call(handleRequest, payload)
62+
const action = yield take(channel)
63+
yield put(action)
7464
}
75-
}
65+
}
66+
export function* workerSenderSaga() {
67+
yield takeLatest(searchRequest.type, doSendToWorker);
68+
}

0 commit comments

Comments
 (0)