aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--backend/api/api/Controllers/WebSocketController.cs63
-rw-r--r--backend/api/api/Program.cs10
-rw-r--r--backend/api/api/Services/IMLWebSocketService.cs7
-rw-r--r--backend/api/api/Services/MLWebSocketService.cs68
-rw-r--r--backend/api/api/Services/MlConnectionService.cs3
-rw-r--r--backend/microservice/__pycache__/mlservice.cpython-310.pycbin0 -> 5009 bytes
-rw-r--r--backend/microservice/ml_socket.py25
-rw-r--r--frontend/src/app/_services/web-socket.service.spec.ts16
-rw-r--r--frontend/src/app/_services/web-socket.service.ts39
9 files changed, 230 insertions, 1 deletions
diff --git a/backend/api/api/Controllers/WebSocketController.cs b/backend/api/api/Controllers/WebSocketController.cs
new file mode 100644
index 00000000..184b47e7
--- /dev/null
+++ b/backend/api/api/Controllers/WebSocketController.cs
@@ -0,0 +1,63 @@
+using api.Services;
+using Microsoft.AspNetCore.Http;
+using Microsoft.AspNetCore.Mvc;
+using System.Net.WebSockets;
+
+namespace api.Controllers
+{
+ [Route("api")]
+ [ApiController]
+ public class WebSocketController : ControllerBase
+ {
+ private IMLWebSocketService mlWS;
+ public WebSocketController(IMLWebSocketService mlWS)
+ {
+ this.mlWS = mlWS;
+ }
+
+ [HttpGet("wstest")]
+ public string Test()
+ {
+ this.mlWS.Send("ABC123");
+ return "MESSAGE SENT!";
+ }
+
+ [HttpGet("ws")]
+ public async Task Get()
+ {
+ if (HttpContext.WebSockets.IsWebSocketRequest)
+ {
+ using var webSocket = await HttpContext.WebSockets.AcceptWebSocketAsync();
+ await Echo(webSocket);
+ }
+ else
+ {
+ HttpContext.Response.StatusCode = StatusCodes.Status400BadRequest;
+ }
+ }
+
+ private static async Task Echo(WebSocket webSocket)
+ {
+ var buffer = new byte[1024 * 4];
+ var receiveResult = await webSocket.ReceiveAsync(
+ new ArraySegment<byte>(buffer), CancellationToken.None);
+
+ while (!receiveResult.CloseStatus.HasValue)
+ {
+ await webSocket.SendAsync(
+ new ArraySegment<byte>(buffer, 0, receiveResult.Count),
+ receiveResult.MessageType,
+ receiveResult.EndOfMessage,
+ CancellationToken.None);
+
+ receiveResult = await webSocket.ReceiveAsync(
+ new ArraySegment<byte>(buffer), CancellationToken.None);
+ }
+
+ await webSocket.CloseAsync(
+ receiveResult.CloseStatus.Value,
+ receiveResult.CloseStatusDescription,
+ CancellationToken.None);
+ }
+ }
+}
diff --git a/backend/api/api/Program.cs b/backend/api/api/Program.cs
index c8cf0784..5913c2d3 100644
--- a/backend/api/api/Program.cs
+++ b/backend/api/api/Program.cs
@@ -33,6 +33,10 @@ builder.Services.AddScoped<IModelService, ModelService>();
builder.Services.AddScoped<IPredictorService, PredictorService>();
builder.Services.AddScoped<IFileService, FileService>();
+var mlwss = new MLWebSocketService();
+
+builder.Services.AddSingleton<IMLWebSocketService>(mlwss);
+builder.Services.AddHostedService(_ => mlwss);
builder.Services.AddHostedService<TempFileService>();
@@ -54,6 +58,12 @@ builder.Services.AddControllers();
var app = builder.Build();
+var webSocketOptions = new WebSocketOptions
+{
+ KeepAliveInterval = TimeSpan.FromMinutes(2)
+};
+
+app.UseWebSockets(webSocketOptions);
//Add Cors
app.UseCors(
diff --git a/backend/api/api/Services/IMLWebSocketService.cs b/backend/api/api/Services/IMLWebSocketService.cs
new file mode 100644
index 00000000..52efb7fc
--- /dev/null
+++ b/backend/api/api/Services/IMLWebSocketService.cs
@@ -0,0 +1,7 @@
+namespace api.Services
+{
+ public interface IMLWebSocketService
+ {
+ void Send(string message);
+ }
+}
diff --git a/backend/api/api/Services/MLWebSocketService.cs b/backend/api/api/Services/MLWebSocketService.cs
new file mode 100644
index 00000000..ca6919bf
--- /dev/null
+++ b/backend/api/api/Services/MLWebSocketService.cs
@@ -0,0 +1,68 @@
+using System.Net.WebSockets;
+using System.Text;
+
+namespace api.Services
+{
+ public class MLWebSocketService: BackgroundService, IMLWebSocketService
+ {
+ private static readonly string Connection = "ws://localhost:5027";
+
+ private Queue<string> dataQueue = new Queue<string>();
+
+ protected override async Task ExecuteAsync(CancellationToken stoppingToken)
+ {
+ while (!stoppingToken.IsCancellationRequested)
+ using (var socket = new ClientWebSocket())
+ try
+ {
+ await socket.ConnectAsync(new Uri(Connection), stoppingToken);
+
+
+ while(dataQueue.Count > 0)
+ {
+ await Send(socket, dataQueue.Dequeue(), stoppingToken);
+ }
+
+ Receive(socket, stoppingToken);
+
+ await socket.CloseAsync(WebSocketCloseStatus.NormalClosure, "", stoppingToken);
+ }
+ catch (Exception ex)
+ {
+ Console.WriteLine($"ERROR - {ex.Message}");
+ }
+ }
+
+ private async Task Send(ClientWebSocket socket, string data, CancellationToken stoppingToken) =>
+ await socket.SendAsync(Encoding.UTF8.GetBytes(data), WebSocketMessageType.Text, true, stoppingToken);
+
+ private async Task Receive(ClientWebSocket socket, CancellationToken stoppingToken)
+ {
+ var buffer = new ArraySegment<byte>(new byte[2048]);
+ while (!stoppingToken.IsCancellationRequested)
+ {
+ WebSocketReceiveResult result;
+ using (var ms = new MemoryStream())
+ {
+ do
+ {
+ result = await socket.ReceiveAsync(buffer, stoppingToken);
+ ms.Write(buffer.Array, buffer.Offset, result.Count);
+ } while (!result.EndOfMessage);
+
+ if (result.MessageType == WebSocketMessageType.Close)
+ break;
+
+ ms.Seek(0, SeekOrigin.Begin);
+ using (var reader = new StreamReader(ms, Encoding.UTF8))
+ Console.WriteLine(await reader.ReadToEndAsync());
+ }
+ };
+ }
+
+ public void Send(string data)
+ {
+ dataQueue.Enqueue(data);
+ }
+ }
+}
diff --git a/backend/api/api/Services/MlConnectionService.cs b/backend/api/api/Services/MlConnectionService.cs
index 7adade0c..9b167537 100644
--- a/backend/api/api/Services/MlConnectionService.cs
+++ b/backend/api/api/Services/MlConnectionService.cs
@@ -1,4 +1,6 @@
using RestSharp;
+using System.Net.WebSockets;
+using System.Text;
namespace api.Services
{
@@ -11,7 +13,6 @@ namespace api.Services
request.AddJsonBody(model);
var result = await client.ExecuteAsync(request);
return result.Content;//Response od ML microservisa
-
}
}
}
diff --git a/backend/microservice/__pycache__/mlservice.cpython-310.pyc b/backend/microservice/__pycache__/mlservice.cpython-310.pyc
new file mode 100644
index 00000000..c079459a
--- /dev/null
+++ b/backend/microservice/__pycache__/mlservice.cpython-310.pyc
Binary files differ
diff --git a/backend/microservice/ml_socket.py b/backend/microservice/ml_socket.py
new file mode 100644
index 00000000..5489b787
--- /dev/null
+++ b/backend/microservice/ml_socket.py
@@ -0,0 +1,25 @@
+import asyncio
+import websockets
+import json
+
+def get_or_create_eventloop():
+ try:
+ return asyncio.get_event_loop()
+ except RuntimeError as ex:
+ if "There is no current event loop in thread" in str(ex):
+ loop = asyncio.new_event_loop()
+ asyncio.set_event_loop(loop)
+ return asyncio.get_event_loop()
+
+# create handler for each connection
+async def handler(websocket, path):
+ #data = json.loads(await websocket.recv())
+ #reply = f"Data recieved as: {data}!"
+ #print(data['test'])
+ msg = await websocket.recv()
+ await websocket.send("[" + msg + "]")
+
+start_server = websockets.serve(handler, "localhost", 5027)
+
+get_or_create_eventloop().run_until_complete(start_server)
+get_or_create_eventloop().run_forever() \ No newline at end of file
diff --git a/frontend/src/app/_services/web-socket.service.spec.ts b/frontend/src/app/_services/web-socket.service.spec.ts
new file mode 100644
index 00000000..a86aeca7
--- /dev/null
+++ b/frontend/src/app/_services/web-socket.service.spec.ts
@@ -0,0 +1,16 @@
+import { TestBed } from '@angular/core/testing';
+
+import { WebSocketService } from './web-socket.service';
+
+describe('WebSocketService', () => {
+ let service: WebSocketService;
+
+ beforeEach(() => {
+ TestBed.configureTestingModule({});
+ service = TestBed.inject(WebSocketService);
+ });
+
+ it('should be created', () => {
+ expect(service).toBeTruthy();
+ });
+});
diff --git a/frontend/src/app/_services/web-socket.service.ts b/frontend/src/app/_services/web-socket.service.ts
new file mode 100644
index 00000000..fc292a62
--- /dev/null
+++ b/frontend/src/app/_services/web-socket.service.ts
@@ -0,0 +1,39 @@
+import { Injectable } from '@angular/core';
+import { API_SETTINGS } from 'src/config';
+import { ConstantBackoff, Websocket, WebsocketBuilder } from 'websocket-ts/lib';
+
+@Injectable({
+ providedIn: 'root'
+})
+export class WebSocketService {
+
+ ws?: Websocket;
+
+ private handlers: Function[] = [];
+
+ constructor() {
+ this.ws = new WebsocketBuilder(API_SETTINGS.apiWSUrl)
+ .withBackoff(new ConstantBackoff(30000))
+ .onOpen((i, e) => { console.log('WS: Connected to ' + API_SETTINGS.apiWSUrl) })
+ .onMessage((i, e) => {
+ console.log('WS MESSAGE: ', e.data);
+ this.handlers.forEach(handler => {
+ handler(e.data);
+ })
+ })
+ .onClose((i, e) => { console.log('WS: Connection closed!') })
+ .build();
+ }
+
+ send(msg: string) {
+ this.ws?.send(msg);
+ }
+
+ addHandler(handler: Function) {
+ this.handlers.push(handler);
+ }
+
+ removeHandler(handler: Function) {
+ this.handlers.splice(this.handlers.indexOf(handler), 1);
+ }
+}