diff options
-rw-r--r-- | backend/api/api/Controllers/WebSocketController.cs | 63 | ||||
-rw-r--r-- | backend/api/api/Program.cs | 10 | ||||
-rw-r--r-- | backend/api/api/Services/IMLWebSocketService.cs | 7 | ||||
-rw-r--r-- | backend/api/api/Services/MLWebSocketService.cs | 68 | ||||
-rw-r--r-- | backend/api/api/Services/MlConnectionService.cs | 3 | ||||
-rw-r--r-- | backend/microservice/__pycache__/mlservice.cpython-310.pyc | bin | 0 -> 5009 bytes | |||
-rw-r--r-- | backend/microservice/ml_socket.py | 25 | ||||
-rw-r--r-- | frontend/src/app/_services/web-socket.service.spec.ts | 16 | ||||
-rw-r--r-- | frontend/src/app/_services/web-socket.service.ts | 39 |
9 files changed, 230 insertions, 1 deletions
diff --git a/backend/api/api/Controllers/WebSocketController.cs b/backend/api/api/Controllers/WebSocketController.cs new file mode 100644 index 00000000..184b47e7 --- /dev/null +++ b/backend/api/api/Controllers/WebSocketController.cs @@ -0,0 +1,63 @@ +using api.Services; +using Microsoft.AspNetCore.Http; +using Microsoft.AspNetCore.Mvc; +using System.Net.WebSockets; + +namespace api.Controllers +{ + [Route("api")] + [ApiController] + public class WebSocketController : ControllerBase + { + private IMLWebSocketService mlWS; + public WebSocketController(IMLWebSocketService mlWS) + { + this.mlWS = mlWS; + } + + [HttpGet("wstest")] + public string Test() + { + this.mlWS.Send("ABC123"); + return "MESSAGE SENT!"; + } + + [HttpGet("ws")] + public async Task Get() + { + if (HttpContext.WebSockets.IsWebSocketRequest) + { + using var webSocket = await HttpContext.WebSockets.AcceptWebSocketAsync(); + await Echo(webSocket); + } + else + { + HttpContext.Response.StatusCode = StatusCodes.Status400BadRequest; + } + } + + private static async Task Echo(WebSocket webSocket) + { + var buffer = new byte[1024 * 4]; + var receiveResult = await webSocket.ReceiveAsync( + new ArraySegment<byte>(buffer), CancellationToken.None); + + while (!receiveResult.CloseStatus.HasValue) + { + await webSocket.SendAsync( + new ArraySegment<byte>(buffer, 0, receiveResult.Count), + receiveResult.MessageType, + receiveResult.EndOfMessage, + CancellationToken.None); + + receiveResult = await webSocket.ReceiveAsync( + new ArraySegment<byte>(buffer), CancellationToken.None); + } + + await webSocket.CloseAsync( + receiveResult.CloseStatus.Value, + receiveResult.CloseStatusDescription, + CancellationToken.None); + } + } +} diff --git a/backend/api/api/Program.cs b/backend/api/api/Program.cs index c8cf0784..5913c2d3 100644 --- a/backend/api/api/Program.cs +++ b/backend/api/api/Program.cs @@ -33,6 +33,10 @@ builder.Services.AddScoped<IModelService, ModelService>(); builder.Services.AddScoped<IPredictorService, PredictorService>(); builder.Services.AddScoped<IFileService, FileService>(); +var mlwss = new MLWebSocketService(); + +builder.Services.AddSingleton<IMLWebSocketService>(mlwss); +builder.Services.AddHostedService(_ => mlwss); builder.Services.AddHostedService<TempFileService>(); @@ -54,6 +58,12 @@ builder.Services.AddControllers(); var app = builder.Build(); +var webSocketOptions = new WebSocketOptions +{ + KeepAliveInterval = TimeSpan.FromMinutes(2) +}; + +app.UseWebSockets(webSocketOptions); //Add Cors app.UseCors( diff --git a/backend/api/api/Services/IMLWebSocketService.cs b/backend/api/api/Services/IMLWebSocketService.cs new file mode 100644 index 00000000..52efb7fc --- /dev/null +++ b/backend/api/api/Services/IMLWebSocketService.cs @@ -0,0 +1,7 @@ +namespace api.Services +{ + public interface IMLWebSocketService + { + void Send(string message); + } +} diff --git a/backend/api/api/Services/MLWebSocketService.cs b/backend/api/api/Services/MLWebSocketService.cs new file mode 100644 index 00000000..ca6919bf --- /dev/null +++ b/backend/api/api/Services/MLWebSocketService.cs @@ -0,0 +1,68 @@ +using System.Net.WebSockets; +using System.Text; + +namespace api.Services +{ + public class MLWebSocketService: BackgroundService, IMLWebSocketService + { + private static readonly string Connection = "ws://localhost:5027"; + + private Queue<string> dataQueue = new Queue<string>(); + + protected override async Task ExecuteAsync(CancellationToken stoppingToken) + { + while (!stoppingToken.IsCancellationRequested) + using (var socket = new ClientWebSocket()) + try + { + await socket.ConnectAsync(new Uri(Connection), stoppingToken); + + + while(dataQueue.Count > 0) + { + await Send(socket, dataQueue.Dequeue(), stoppingToken); + } + + Receive(socket, stoppingToken); + + await socket.CloseAsync(WebSocketCloseStatus.NormalClosure, "", stoppingToken); + } + catch (Exception ex) + { + Console.WriteLine($"ERROR - {ex.Message}"); + } + } + + private async Task Send(ClientWebSocket socket, string data, CancellationToken stoppingToken) => + await socket.SendAsync(Encoding.UTF8.GetBytes(data), WebSocketMessageType.Text, true, stoppingToken); + + private async Task Receive(ClientWebSocket socket, CancellationToken stoppingToken) + { + var buffer = new ArraySegment<byte>(new byte[2048]); + while (!stoppingToken.IsCancellationRequested) + { + WebSocketReceiveResult result; + using (var ms = new MemoryStream()) + { + do + { + result = await socket.ReceiveAsync(buffer, stoppingToken); + ms.Write(buffer.Array, buffer.Offset, result.Count); + } while (!result.EndOfMessage); + + if (result.MessageType == WebSocketMessageType.Close) + break; + + ms.Seek(0, SeekOrigin.Begin); + using (var reader = new StreamReader(ms, Encoding.UTF8)) + Console.WriteLine(await reader.ReadToEndAsync()); + } + }; + } + + public void Send(string data) + { + dataQueue.Enqueue(data); + } + } +} diff --git a/backend/api/api/Services/MlConnectionService.cs b/backend/api/api/Services/MlConnectionService.cs index 7adade0c..9b167537 100644 --- a/backend/api/api/Services/MlConnectionService.cs +++ b/backend/api/api/Services/MlConnectionService.cs @@ -1,4 +1,6 @@ using RestSharp; +using System.Net.WebSockets; +using System.Text; namespace api.Services { @@ -11,7 +13,6 @@ namespace api.Services request.AddJsonBody(model); var result = await client.ExecuteAsync(request); return result.Content;//Response od ML microservisa - } } } diff --git a/backend/microservice/__pycache__/mlservice.cpython-310.pyc b/backend/microservice/__pycache__/mlservice.cpython-310.pyc Binary files differnew file mode 100644 index 00000000..c079459a --- /dev/null +++ b/backend/microservice/__pycache__/mlservice.cpython-310.pyc diff --git a/backend/microservice/ml_socket.py b/backend/microservice/ml_socket.py new file mode 100644 index 00000000..5489b787 --- /dev/null +++ b/backend/microservice/ml_socket.py @@ -0,0 +1,25 @@ +import asyncio +import websockets +import json + +def get_or_create_eventloop(): + try: + return asyncio.get_event_loop() + except RuntimeError as ex: + if "There is no current event loop in thread" in str(ex): + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + return asyncio.get_event_loop() + +# create handler for each connection +async def handler(websocket, path): + #data = json.loads(await websocket.recv()) + #reply = f"Data recieved as: {data}!" + #print(data['test']) + msg = await websocket.recv() + await websocket.send("[" + msg + "]") + +start_server = websockets.serve(handler, "localhost", 5027) + +get_or_create_eventloop().run_until_complete(start_server) +get_or_create_eventloop().run_forever()
\ No newline at end of file diff --git a/frontend/src/app/_services/web-socket.service.spec.ts b/frontend/src/app/_services/web-socket.service.spec.ts new file mode 100644 index 00000000..a86aeca7 --- /dev/null +++ b/frontend/src/app/_services/web-socket.service.spec.ts @@ -0,0 +1,16 @@ +import { TestBed } from '@angular/core/testing'; + +import { WebSocketService } from './web-socket.service'; + +describe('WebSocketService', () => { + let service: WebSocketService; + + beforeEach(() => { + TestBed.configureTestingModule({}); + service = TestBed.inject(WebSocketService); + }); + + it('should be created', () => { + expect(service).toBeTruthy(); + }); +}); diff --git a/frontend/src/app/_services/web-socket.service.ts b/frontend/src/app/_services/web-socket.service.ts new file mode 100644 index 00000000..fc292a62 --- /dev/null +++ b/frontend/src/app/_services/web-socket.service.ts @@ -0,0 +1,39 @@ +import { Injectable } from '@angular/core'; +import { API_SETTINGS } from 'src/config'; +import { ConstantBackoff, Websocket, WebsocketBuilder } from 'websocket-ts/lib'; + +@Injectable({ + providedIn: 'root' +}) +export class WebSocketService { + + ws?: Websocket; + + private handlers: Function[] = []; + + constructor() { + this.ws = new WebsocketBuilder(API_SETTINGS.apiWSUrl) + .withBackoff(new ConstantBackoff(30000)) + .onOpen((i, e) => { console.log('WS: Connected to ' + API_SETTINGS.apiWSUrl) }) + .onMessage((i, e) => { + console.log('WS MESSAGE: ', e.data); + this.handlers.forEach(handler => { + handler(e.data); + }) + }) + .onClose((i, e) => { console.log('WS: Connection closed!') }) + .build(); + } + + send(msg: string) { + this.ws?.send(msg); + } + + addHandler(handler: Function) { + this.handlers.push(handler); + } + + removeHandler(handler: Function) { + this.handlers.splice(this.handlers.indexOf(handler), 1); + } +} |