Setting callbacks to respond to incoming messages on a LightStreamer channel.

3 views (last 30 days)
I want to read an open channel from LightStreamer, in particular the LightStreamer service from IG.
LS operates in two modes: streaming and polling (streaming being preferred). I have quite successfully written a lot of code to handle a polling connection (setting LS_polling=true on /lightstreamer/create_session.txt ; false is the default) but handling a stream connection would be much more efficient and simplify the process quite considerably.
I've been using Jim Hokanson's excellent URLREAD2 function for polling but this appears to hang if I try it in a streaming mode, on line 263 of the following, if that's helpful to anyone.
260 byteArrayOutputStream = java.io.ByteArrayOutputStream;
261 % This StreamCopier is unsupported and may change at any time. OH GREAT :/
262 isc = InterruptibleStreamCopier.getInterruptibleStreamCopier;
263 isc.copyStream(inputStream,byteArrayOutputStream);
264 inputStream.close;
265 byteArrayOutputStream.close;
I'm wondering if some modification to URLREAD2 might be possible that would open the connection and allow the setting of callbacks to handle incoming messages?
  2 Comments
Neil Caithness
Neil Caithness on 28 Apr 2015
Just to flesh it out I have in mind something like this:
% URLREAD2L with a listner
[output, extras, listner] = urlread2l(url, method, body, headers)
set(listner, 'Callback', @onStream)
or, better, as part of varargin to urlread:
[output, extras, listner] = urlread2l(url, method, body, headers, ...
'Callback', @onStream)
Neil Caithness
Neil Caithness on 29 Apr 2015
Edited: Neil Caithness on 29 Apr 2015
I've made some progress with this, the StreamCopier above needs to be intercepted and replaced with something that detects EOL's ( \r\n ) and fires an event notifier.
Something like this: (a bit crude but it lets me experiment with the stream for now)
byteArrayOutputStream = zeros(1,256); % <---- catch if exceeded
i = 1; % <---- start at 1 so first entry is at 2
try
while true
i = i+1;
byteArrayOutputStream(i) = inputStream.read;
if isequal(byteArrayOutputStream(i-1:i),[13 10]) % "\r\n"
output = char(byteArrayOutputStream(2:i));
byteArrayOutputStream(:) = 0; i = 1; % <---- reset
fprintf('%s',output) % <---- replace with event notifier
end
end
catch ME
% TODO
end
Output is something like the following:
OK
SessionId:S3d11cddde1b7455aT4829892
ControlAddress:apd120a.marketdatasystems.com
KeepaliveMillis:5000
MaxBandwidth:0.0
RequestLimit:50000
1,3|15356.4|TRADEABLE|0.5714285
1,1|7991.0|TRADEABLE|0.5714285
1,8|12055.4|TRADEABLE|0.5524861
1,6|18317.7|TRADEABLE|0.5714285
1,7|9533.7|TRADEABLE|0.5524861
1,5|13116.0|TRADEABLE|0.5714285
1,4|11928.5|TRADEABLE|0.5524861
1,2|10995.5|TRADEABLE|0.5524861
...
So I guess my question becomes: is there already an efficient way to do this in the java classes included in com.mathworks.mlwidgets.io.InterruptibleStreamCopier that I don't know about?

Sign in to comment.

Accepted Answer

Neil Caithness
Neil Caithness on 30 Apr 2015
This is somewhat like having a conversation with myself (it happens). Anyway, as I started I'll continue. It turns out not to be as complicated as I thought it would be. I can strip out much of URLREAD2 for this dedicated situation, and then decoding the stream is very well defined. All that's left is to cast this into the classdef and handle the events.
function output = urlreadls(url,body,varargin)
% + URL
try
protocol = url(1:find(url==':',1)-1);
handler = sun.net.www.protocol.(protocol).Handler;
url = java.net.URL([],url,handler);
urlConnection = url.openConnection;
urlConnection.setRequestMethod('POST');
catch ME
% TODO
rethrow(ME)
end
% + BODY
try
body = unicode2native(body,'');
urlConnection.setRequestProperty('Content-Length',int2str(length(body)));
urlConnection.setDoOutput(true);
catch ME
% TODO
rethrow(ME)
end
% + POST
try
outputStream = urlConnection.getOutputStream;
outputStream.write(body);
outputStream.close;
catch ME
% TODO
rethrow(ME)
end
% + STEAM
inputStream = urlConnection.getInputStream;
% Read the first line; it will be either "ERROR", "OK",
% or possibly HTTP error 500.
line = linereader(inputStream);
switch line
case 'ERROR'
% There will be 2 more lines only; then close the stream.
output.ERROR.err_code = linereader(inputStream);
output.ERROR.err_msg = linereader(inputStream);
inputStream.close;
case 'OK'
% There will be between 3 and 6 lines following in the form:
% "ParamName:" + ParamValue + "\r\n", followed finally by a blank
% line. The blank line marks the start of the push phase.
line = linereader(inputStream);
while ~isempty(line)
param = regexp(line,':','split');
output.OK.(param{1}) = param{2};
line = linereader(inputStream);
end
% Handle the push phase.
try
while true
line = linereader(inputStream);
disp(line)
end
catch ME
end
otherwise
% Probably HTTP 500; I don't know exactly what this looks like
% until I see one. TODO: confirm it's a 500 or handle seperately.
output.ERROR.err_code = '500';
output.ERROR.err_msg = line;
save errorline line
inputStream.close;
end
function line = linereader(inputStream)
byteArray(1) = inputStream.read; % <--- 'O'
byteArray(2) = inputStream.read; % <--- 'K'
while ~isequal(byteArray(end-1:end),[13 10])
byteArray(end+1) = inputStream.read;
end
line = char(byteArray(1:end-2));
  1 Comment
Andrea Giovannetti
Andrea Giovannetti on 9 Nov 2015
That's amazing. I was wondering whether someone had taken the burden to work on urlread2 with that precise purpose. My problem was actually fairly simpler, but I have still no success in tackling it. I am stucked in working with LS_pooling, more precisely, I am trying to keep a standardized output, so that the lack of a value update (due to MERGE) still produces some "blank" output, which I can interpret as a "no-update" condition... May I ask you a suggestion? Thanks,

Sign in to comment.

More Answers (0)

Community Treasure Hunt

Find the treasures in MATLAB Central and discover how the community can help you!

Start Hunting!