TCPおよびWebSocketの標準を使用したメッセージの送受信プロセスは異なります。以下では、両方の接続タイプについてこのプロセスを詳しく説明します。
TCPの使用¶
メッセージの送信¶
TCP接続経由でメッセージを送信するには、以下の手順を実行します。
Protobufメッセージを、選択したプログラミング言語の公式Google Protocol Buffer SDKを使用して、バイトの配列(Protobufエンコーディングを使用)に変換します。
ステップ1で作成した配列の長さを取得します。この整数から新しいバイト配列を作成します。新しいバイト配列を反転させます。
新しいバイト配列と元のProtobufメッセージを含むバイト配列を連結します。
連結された配列を接続ストリームに送信します。
以下の例では、公式のOpen API SDKでこれらの手順をどのように実行するかを示しています。
private async Task WriteTcp(byte[] messageByte, CancellationToken cancellationToken)
{
byte[] array = BitConverter.GetBytes(messageByte.Length).Reverse().Concat(messageByte)
.ToArray();
await _sslStream.WriteAsync(array, 0, array.Length, cancellationToken).ConfigureAwait(continueOnCapturedContext: false);
await _sslStream.FlushAsync(cancellationToken).ConfigureAwait(continueOnCapturedContext: false);
}
client = Client(EndPoints.PROTOBUF_LIVE_HOST if hostType.lower() == "live" else EndPoints.PROTOBUF_DEMO_HOST, EndPoints.PROTOBUF_PORT, TcpProtocol)
request = ProtoOAApplicationAuthReq() # Can be any message
deferred = client.send(request)
- Note
Twistedを使用すると、Pythonの例はC#の例とほぼ同じ操作を実行します。client.send(request)
は以下のように説明できます。
request = ProtoOAApplicationAuthReq() # Can be any message
requestAsString = request.SerializeToString() # This method is a part of the Google Protobuf SDK
requestAsInt32String = struct.pack("!H", len(requestAsString)) # The message is concatenated with the reversed array
メッセージの送信には、Python SDKはProtocol.transport.write()
メソッドを使用します。
メッセージの読み取り¶
非同期コード
すべてのOpen API SDKは非同期実行に依存しています。つまり、メッセージの到着を待たずに、動的に到着するメッセージに反応します。その結果、メッセージの受信は通常、イベントハンドラを介して行われます。
メッセージを読み取るには、メッセージを送信するために必要な手順を逆にする一連のアクションを実行する必要があります。
バイト配列の最初の4バイトを受信します(メッセージの長さを示すことを覚えておいてください)。これらの4バイトを逆順にし、整数に変換します。
ステップ1で取得した整数の量、つまり
X
個のバイトをストリームから読み取ります。Google Protobuf SDKを使用して、メッセージを有効な
ProtoMessage
に逆シリアル化します。ProtoMessage
オブジェクトのpayloadType
フィールドを使用して、その実際のタイプを見つけます。Google Protobuf SDKを介して、ProtoMessage
を必要なProtoOA...
タイプのオブジェクトに変更します。
以下のコードスニペットは、公式のOpen API SKDがメッセージの読み取りにアプローチする方法を示しています。
_tcpClient = new TcpClient
{
LingerState = new LingerOption(enable: true, 10)
};
await _tcpClient.ConnectAsync(Host, Port).ConfigureAwait(continueOnCapturedContext: false);
SslStream _sslStream = new SslStream(_tcpClient.GetStream(), leaveInnerStreamOpen: false);
await _sslStream.AuthenticateAsClientAsync(Host).ConfigureAwait(continueOnCapturedContext: false);
private async void ReadTcp(CancellationToken cancellationToken)
{
byte[] dataLength = new byte[4];
byte[] data = null;
try
{
while (!IsDisposed)
{
int num = 0;
do
{
int count = dataLength.Length - num;
int num2 = num;
num = num2 + await _sslStream.ReadAsync(dataLength, num, count, cancellationToken).ConfigureAwait(continueOnCapturedContext: false);
if (num == 0)
{
throw new InvalidOperationException("Remote host closed the connection");
}
}
while (num < dataLength.Length);
int length = GetLength(dataLength);
if (length <= 0)
{
continue;
}
data = ArrayPool.Shared.Rent(length);
num = 0;
do
{
int count2 = length - num;
int num2 = num;
num = num2 + await _sslStream.ReadAsync(data, num, count2, cancellationToken).ConfigureAwait(continueOnCapturedContext: false);
if (num == 0)
{
throw new InvalidOperationException("Remote host closed the connection");
}
}
while (num < length);
ProtoMessage protoMessage = ProtoMessage.Parser.ParseFrom(data, 0, length);
ArrayPool.Shared.Return(data);
OnNext(protoMessage);
}
}
catch (Exception innerException)
{
if (data != null)
{
ArrayPool.Shared.Return(data);
}
ReceiveException exception = new ReceiveException(innerException);
OnError(exception);
}
}
private int GetLength(byte[] lengthBytes)
{
Span span = lengthBytes.AsSpan();
span.Reverse();
return BitConverter.ToInt32(span);
}
def stringReceived(self, data):
msg = ProtoMessage()
msg.ParseFromString(data)
if msg.payloadType == ProtoHeartbeatEvent().payloadType:
self.heartbeat()
self.factory.received(msg)
return data
def stringReceived(self, data):
msg = ProtoMessage()
msg.ParseFromString(data)
if msg.payloadType == ProtoHeartbeatEvent().payloadType:
self.heartbeat()
self.factory.received(msg)
return data
WebSocketの使用方法
メッセージの送信
WebSocket接続経由でメッセージを送信するには、以下の手順を実行します。
メッセージを適切なデータ形式(例:文字列)にシリアル化します。
シリアル化されたメッセージを送信キューに追加します。
以下の例では、公式のOpen API SDKでこれらのアクションがどのように実行されるかを示しています。
C# SDK は WebsocketClient
クラスを使用します。これは Websocket.Client
パッケージの一部です。以下に示すように、WebsocketClient.Send()
メソッドは次のように機能します。
public void Send(byte[] message)
{
Websocket.Client.Validations.Validations.ValidateInput(message, "message");
_messagesBinaryToSendQueue.Writer.TryWrite(new ArraySegment(message));
}
メッセージを受信する¶
WebSocket接続でメッセージを受信するには、以下の手順を実行します。
cTraderバックエンドから受信したデータを取得します。
データを有効なProtobufメッセージに逆シリアル化します。
公式のOpen API SDKでこれがどのように行われるかのイラストについては、以下のスニペットを参照してください。
メッセージを受信するには、WebsocketClient
が新しいメッセージを受け入れる際にクライアントが行う処理を処理するコールバック関数に登録する必要があります。
var client = new WebsocketCliebnt();
client.MessageReceived.Subscribe(msg => {Console.WriteLine(msg);})
購読する前に、.NET SDK はメッセージを Protobuf メッセージにパースします。必要な購読は、ConnectWebSocket()
コールバックの本体に追加されます。
private async Task ConnectWebScoket()
{
DefaultInterpolatedStringHandler defaultInterpolatedStringHandler = new DefaultInterpolatedStringHandler(7, 2);
defaultInterpolatedStringHandler.AppendLiteral("wss://");
defaultInterpolatedStringHandler.AppendFormatted(Host);
defaultInterpolatedStringHandler.AppendLiteral(":");
defaultInterpolatedStringHandler.AppendFormatted(Port);
Uri url = new Uri(defaultInterpolatedStringHandler.ToStringAndClear());
_websocketClient = new WebsocketClient(url, () => new ClientWebSocket())
{
IsTextMessageConversionEnabled = false,
ReconnectTimeout = null,
IsReconnectionEnabled = false,
ErrorReconnectTimeout = null
};
_webSocketMessageReceivedDisposable = _websocketClient.MessageReceived.Select((ResponseMessage msg) => ProtoMessage.Parser.ParseFrom(msg.Binary)).Subscribe(new Action(OnNext));
_webSocketDisconnectionHappenedDisposable = _websocketClient.DisconnectionHappened.Subscribe(new Action(OnWebSocketDisconnectionHappened));
await _websocketClient.StartOrFail();
}
OnNext()
コールバックでは、ProtoMessage
が MessageFactory.GetMessage()
コールバックに渡されます。
private void OnNext(ProtoMessage protoMessage)
{
foreach (KeyValuePair> observer2 in _observers)
{
observer2.Deconstruct(out var _, out var value);
IObserver observer = value;
try
{
IMessage message = MessageFactory.GetMessage(protoMessage);
if (protoMessage.HasClientMsgId || message == null)
{
observer.OnNext(protoMessage);
}
if (message != null)
{
observer.OnNext(message);
}
}
catch (Exception innerException)
{
ObserverException exception = new ObserverException(innerException, observer);
OnError(exception);
}
}
}