From b83479e1b6700d69af2aa530b90a82e8ab538ce5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=9C=D0=B0=D0=BA=D1=81=D0=B8=D0=BC=20=D0=91=D0=B0=D0=B3?= =?UTF-8?q?=D1=80=D1=8F=D0=BD=D1=86=D0=B5=D0=B2?= Date: Tue, 28 Apr 2020 22:34:04 +0300 Subject: [PATCH] Updated Input and Output Pipe arguments. Derived them from PipieArgument --- .../Argument/Atoms/InputPipeArgument.cs | 45 +++++-------------- .../Argument/Atoms/OutputPipeArgument.cs | 44 +++++------------- .../FFMPEG/Argument/Atoms/PipeArgument.cs | 45 +++++++++++++++++++ FFMpegCore/FFMPEG/Pipes/IPipeDataReader.cs | 1 + .../FFMPEG/Pipes/StreamPipeDataReader.cs | 10 ++++- 5 files changed, 77 insertions(+), 68 deletions(-) create mode 100644 FFMpegCore/FFMPEG/Argument/Atoms/PipeArgument.cs diff --git a/FFMpegCore/FFMPEG/Argument/Atoms/InputPipeArgument.cs b/FFMpegCore/FFMPEG/Argument/Atoms/InputPipeArgument.cs index cd818d8..6197a23 100644 --- a/FFMpegCore/FFMPEG/Argument/Atoms/InputPipeArgument.cs +++ b/FFMpegCore/FFMPEG/Argument/Atoms/InputPipeArgument.cs @@ -6,6 +6,7 @@ using System.IO; using System.IO.Pipes; using System.Text; +using System.Threading; using System.Threading.Tasks; namespace FFMpegCore.FFMPEG.Argument @@ -13,50 +14,26 @@ namespace FFMpegCore.FFMPEG.Argument /// /// Represents input parameter for a named pipe /// - public class InputPipeArgument : Argument + public class InputPipeArgument : PipeArgument { - public string PipeName { get; private set; } - public string PipePath => PipeHelpers.GetPipePath(PipeName); - public IPipeDataWriter Source { get; private set; } + public IPipeDataWriter Writer { get; private set; } - private NamedPipeServerStream pipe; - - public InputPipeArgument(IPipeDataWriter source) + public InputPipeArgument(IPipeDataWriter writer) : base(PipeDirection.Out) { - Source = source; - PipeName = PipeHelpers.GetUnqiuePipeName(); - } - - public void OpenPipe() - { - if (pipe != null) - throw new InvalidOperationException("Pipe already has been opened"); - - pipe = new NamedPipeServerStream(PipeName); - } - - public void ClosePipe() - { - pipe?.Dispose(); - pipe = null; + Writer = writer; } public override string GetStringValue() { - return $"-y {Source.GetFormat()} -i \"{PipePath}\""; + return $"-y {Writer.GetFormat()} -i \"{PipePath}\""; } - public void FlushPipe() + public override async Task ProcessDataAsync(CancellationToken token) { - pipe.WaitForConnection(); - Source.WriteData(pipe); - } - - - public async Task FlushPipeAsync() - { - await pipe.WaitForConnectionAsync(); - await Source.WriteDataAsync(pipe); + await Pipe.WaitForConnectionAsync(token).ConfigureAwait(false); + if (!Pipe.IsConnected) + throw new TaskCanceledException(); + await Writer.WriteDataAsync(Pipe).ConfigureAwait(false); } } } diff --git a/FFMpegCore/FFMPEG/Argument/Atoms/OutputPipeArgument.cs b/FFMpegCore/FFMPEG/Argument/Atoms/OutputPipeArgument.cs index 201e3d0..fd02df2 100644 --- a/FFMpegCore/FFMPEG/Argument/Atoms/OutputPipeArgument.cs +++ b/FFMpegCore/FFMPEG/Argument/Atoms/OutputPipeArgument.cs @@ -3,51 +3,31 @@ using System.Collections.Generic; using System.IO.Pipes; using System.Text; +using System.Threading; using System.Threading.Tasks; namespace FFMpegCore.FFMPEG.Argument { - public class OutputPipeArgument : Argument + public class OutputPipeArgument : PipeArgument { - public string PipeName { get; private set; } - public string PipePath => PipeHelpers.GetPipePath(PipeName); public IPipeDataReader Reader { get; private set; } - private NamedPipeClientStream pipe; - - public OutputPipeArgument(IPipeDataReader reader) + public OutputPipeArgument(IPipeDataReader reader) : base(PipeDirection.In) { Reader = reader; - PipeName = PipeHelpers.GetUnqiuePipeName(); - } - - public void OpenPipe() - { - if(pipe != null) - throw new InvalidOperationException("Pipe already has been opened"); - - pipe = new NamedPipeClientStream(PipePath); - } - - public void ReadData() - { - Reader.ReadData(pipe); - } - - public Task ReadDataAsync() - { - return Reader.ReadDataAsync(pipe); - } - - public void Close() - { - pipe?.Dispose(); - pipe = null; } public override string GetStringValue() { - return $"\"{PipePath}\""; + return $"\"{PipePath}\" -y"; + } + + public override async Task ProcessDataAsync(CancellationToken token) + { + await Pipe.WaitForConnectionAsync(token).ConfigureAwait(false); + if (!Pipe.IsConnected) + throw new TaskCanceledException(); + await Reader.ReadDataAsync(Pipe).ConfigureAwait(false); } } } diff --git a/FFMpegCore/FFMPEG/Argument/Atoms/PipeArgument.cs b/FFMpegCore/FFMPEG/Argument/Atoms/PipeArgument.cs new file mode 100644 index 0000000..81fb872 --- /dev/null +++ b/FFMpegCore/FFMPEG/Argument/Atoms/PipeArgument.cs @@ -0,0 +1,45 @@ +using FFMpegCore.FFMPEG.Pipes; +using System; +using System.Collections.Generic; +using System.IO.Pipes; +using System.Text; +using System.Threading; +using System.Threading.Tasks; + +namespace FFMpegCore.FFMPEG.Argument +{ + public abstract class PipeArgument : Argument + { + public string PipeName { get; private set; } + public string PipePath => PipeHelpers.GetPipePath(PipeName); + + protected NamedPipeServerStream Pipe { get; private set; } + private PipeDirection direction; + + protected PipeArgument(PipeDirection direction) + { + PipeName = PipeHelpers.GetUnqiuePipeName(); + this.direction = direction; + } + + public void OpenPipe() + { + if (Pipe != null) + throw new InvalidOperationException("Pipe already has been opened"); + + Pipe = new NamedPipeServerStream(PipeName, direction, 1, PipeTransmissionMode.Byte, PipeOptions.Asynchronous); + } + + public void ClosePipe() + { + Pipe?.Dispose(); + Pipe = null; + } + public Task ProcessDataAsync() + { + return ProcessDataAsync(CancellationToken.None); + } + + public abstract Task ProcessDataAsync(CancellationToken token); + } +} diff --git a/FFMpegCore/FFMPEG/Pipes/IPipeDataReader.cs b/FFMpegCore/FFMPEG/Pipes/IPipeDataReader.cs index eeb5734..3912cb3 100644 --- a/FFMpegCore/FFMPEG/Pipes/IPipeDataReader.cs +++ b/FFMpegCore/FFMPEG/Pipes/IPipeDataReader.cs @@ -9,5 +9,6 @@ public interface IPipeDataReader { void ReadData(System.IO.Stream stream); Task ReadDataAsync(System.IO.Stream stream); + string GetFormat(); } } diff --git a/FFMpegCore/FFMPEG/Pipes/StreamPipeDataReader.cs b/FFMpegCore/FFMPEG/Pipes/StreamPipeDataReader.cs index d080806..372d227 100644 --- a/FFMpegCore/FFMPEG/Pipes/StreamPipeDataReader.cs +++ b/FFMpegCore/FFMPEG/Pipes/StreamPipeDataReader.cs @@ -9,8 +9,9 @@ public class StreamPipeDataReader : IPipeDataReader { public System.IO.Stream DestanationStream { get; private set; } public int BlockSize { get; set; } = 4096; + public string Format { get; set; } = string.Empty; - public StreamPipeDataReader(System.IO. Stream destanationStream) + public StreamPipeDataReader(System.IO.Stream destanationStream) { DestanationStream = destanationStream; } @@ -19,7 +20,7 @@ public void ReadData(System.IO.Stream stream) { int read; var buffer = new byte[BlockSize]; - while((read = stream.Read(buffer, 0, buffer.Length)) != 0) + while ((read = stream.Read(buffer, 0, buffer.Length)) != 0) DestanationStream.Write(buffer, 0, buffer.Length); } @@ -30,5 +31,10 @@ public async Task ReadDataAsync(System.IO.Stream stream) while ((read = await stream.ReadAsync(buffer, 0, buffer.Length)) != 0) await DestanationStream.WriteAsync(buffer, 0, buffer.Length); } + + public string GetFormat() + { + return Format; + } } }