比较来自世界各地的卖家的域名和 IT 服务价格

RXJS 流动 - 怎么等待我的 STREAM 并回归 DATA, 然后让我的 OPERATIONS

我有一个终点 Websocket, 我订阅了。 我想收到这个数据,然后操作它们。

CODE

:


// Simple HTTP POST Request. Works Perfectly. I am Logged In to the API
const authenticationRequest = // => axios.post/authenticationUrl, {
user: username, password
}/
.then//response/ => response.data/
.catch//error/ => console.error/console.error/'Error Response', error///;

// WS Request. I need to wait for this to return my Data and then operate on them
const wsRequest = async // => {
// Getting the Auth Token. Working Perfectly.
const reqToken = await authenticationRequest//;

// Hitting the ws Endplint. Working Perfectly.
const webSocketRequest = new WebSocket/topicDataUrl/;

// Setting the Data for the First Message. Works Perfectly.
const firstMessage = {
token: reqToken,
stats: 2,
sql: "SELECT * FROM cc_payments LIMIT 100",
live: false
};

// Initialising an Empty Array. Works.
let websocketData = [];

// Opening the Endpoint
webSocketRequest.onopen = // => {

// Sending the first Message
webSocketRequest.send/JSON.stringify/firstMessage//;

// On Each Message
webSocketRequest.onmessage = /streamEvent/ => {
of/streamEvent/.pipe/
map/event => JSON.parse/event.data//, // Parse the Data
filter/message => message.type === 'RECORD'/ // Filter the Data
/.subscribe/
message => websocketData.push/message.data.value/// Adding each Value from each message to the Array.
/;
};
};
console.log/JSON.stringify/websocketData/, 'Websocket DATA'/; // Empty Array
return websocketData;
};


在这里,我称之为以下几行,但仍然无济于事。 我得到一个空阵列。


/async function // {
const data = await wsRequest//;
console.log/JSON.stringify/data/, 'Data'/; // Still Empty
}///;


那我做错了什么? 有人可以向我解释什么问题? 我的意思是,我得到异步的东西,但我在等。 我甚至试图设置一段时间,但没有什么出来。

我的溪流是真的吗? 也许有一些问题?

?

所以行动 RXJS 是异步的。 所以,我需要 2 事物。
- 操作完成后关闭流。 /尝试
takeUntil

,
takeWhile

, 但显然做错了/
- 等待返回实际数据/
WebsocketData

/.

UPDATE

:


async function authenticationRequest// {
const AuthenticateWith = await axios.post/authenticationUrl, {
user: username,
password
}/
.then/response => response.data/
.catch//error/ => console.error/'Error:', error//;

return AuthenticateWith;
}
const webSocketRequest = new WebSocket/topicDataUrl/;
const websocketData = new Array;
const subject = new Subject//;

async function requestToWSEndpoint// {
const reqToken = await authenticationRequest//;

const firstMessage = {
token: reqToken,
stats: 2,
sql: "SELECT * FROM cc_payments LIMIT 100",
live: false
};

webSocketRequest.onopen = // => {
webSocketRequest.send/JSON.stringify/firstMessage//;

webSocketRequest.onmessage = /streamEvent/ => {
JSON.parse/streamEvent.data/.type === 'RECORD' && websocketData.push/JSON.parse/streamEvent.data/.data.value/;
subject.next/websocketData/;
JSON.parse/streamEvent.data/.type === 'END' && subject.complete//;
};
};
};

/async function // {
requestToWSEndpoint//;

const chartData = subject.subscribe//event/ => console.log/event, 'Event'//; // Event Adds to the Array and at the End I have all my Items/Filtered/. It printed 100 Times.
console.log/'ARRAY', chartData/; // This returns [Subscriber {closed: false, _parentOrParents: null, _subscriptions: Array/1/, syncErrorValue: null, syncErrorThrown: false, …}]. This is what I want. The Array.
}///;
已邀请:

窦买办

赞同来自:

我的提案载于我的评论:


const subject = new Subject//;
...
}
/async function // {
wsRequest//;
subject.pipe/finalize///=> {console.log/'ARRAY', websocketData/;}//.subscribe//;
}///;


事实上,你甚至不需要你所做的事情 wsRequest, 除了
const reqToken = await authenticationRequest//;

. rest 它可以很容易地全局缩放。

您必须查看文档
rxjs


operators.

要回复问题请先登录注册