12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364open!Coreopen!AsyncopenBonsai_chat_open_source_commonletmessage_streamglobal_state=letbus_read=Bus.read_onlyglobal_state.Global_state.message_businletf_user_state()=Async_bus.pipe1_exnbus_read[%here]|>Deferred.Result.returninRpc.Pipe_rpc.implementProtocol.Message_stream.tf;;letmessage_requestglobal_state=letmessages=global_state.Global_state.messagesinletf_user_stateroom=matchRoom.Table.findmessagesroomwith|None->return[]|Someq->return(Queue.to_listq)inRpc.Rpc.implementProtocol.Messages_request.tf;;letsend_messageglobal_state=letmessages=global_state.Global_state.messagesinletbus=global_state.Global_state.message_businletfuser_statemessage=letauthor=user_state.User_state.userinletmessage={messagewithMessage.author}inletroom=Message.roommessageinmatchRoom.Table.findmessagesroomwith|Somemessages->Queue.enqueuemessagesmessage;Bus.writebusmessage;Deferred.Or_error.return()|None->Deferred.Or_error.error_s[%message"room not found"(room:Room.t)]inRpc.Rpc.implementProtocol.Send_message.tf;;letcreate_roomglobal_state=letmessages=global_state.Global_state.messagesinletf_user_stateroom=matchRoom.Table.addmessages~key:room~data:(Queue.create())with|`Ok->Deferred.Or_error.return()|`Duplicate->Deferred.Or_error.error_s[%message"duplicate room"(room:Room.t)]inRpc.Rpc.implementProtocol.Create_room.tf;;letlist_roomsglobal_state=letmessages=global_state.Global_state.messagesinletf_user_state()=return(Room.Table.keysmessages)inRpc.Rpc.implementProtocol.List_rooms.tf;;letimplementationsglobal_state=Rpc.Implementations.create_exn~implementations:[message_streamglobal_state;message_requestglobal_state;send_messageglobal_state;create_roomglobal_state;list_roomsglobal_state]~on_unknown_rpc:`Continue;;