Updated Input and Output Pipe arguments. Derived them from PipieArgument

Former-commit-id: b83479e1b6
This commit is contained in:
Максим Багрянцев 2020-04-28 22:34:04 +03:00
parent e462b424eb
commit b5361b69b0
5 changed files with 77 additions and 68 deletions

View file

@ -6,6 +6,7 @@
using System.IO; using System.IO;
using System.IO.Pipes; using System.IO.Pipes;
using System.Text; using System.Text;
using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
namespace FFMpegCore.FFMPEG.Argument namespace FFMpegCore.FFMPEG.Argument
@ -13,50 +14,26 @@ namespace FFMpegCore.FFMPEG.Argument
/// <summary> /// <summary>
/// Represents input parameter for a named pipe /// Represents input parameter for a named pipe
/// </summary> /// </summary>
public class InputPipeArgument : Argument public class InputPipeArgument : PipeArgument
{ {
public string PipeName { get; private set; } public IPipeDataWriter Writer { get; private set; }
public string PipePath => PipeHelpers.GetPipePath(PipeName);
public IPipeDataWriter Source { get; private set; }
private NamedPipeServerStream pipe; public InputPipeArgument(IPipeDataWriter writer) : base(PipeDirection.Out)
public InputPipeArgument(IPipeDataWriter source)
{ {
Source = source; Writer = writer;
PipeName = PipeHelpers.GetUnqiuePipeName();
}
public void OpenPipe()
{
if (pipe != null)
throw new InvalidOperationException("Pipe already has been opened");
pipe = new NamedPipeServerStream(PipeName);
}
public void ClosePipe()
{
pipe?.Dispose();
pipe = null;
} }
public override string GetStringValue() public override string GetStringValue()
{ {
return $"-y {Source.GetFormat()} -i \"{PipePath}\""; return $"-y {Writer.GetFormat()} -i \"{PipePath}\"";
} }
public void FlushPipe() public override async Task ProcessDataAsync(CancellationToken token)
{ {
pipe.WaitForConnection(); await Pipe.WaitForConnectionAsync(token).ConfigureAwait(false);
Source.WriteData(pipe); if (!Pipe.IsConnected)
} throw new TaskCanceledException();
await Writer.WriteDataAsync(Pipe).ConfigureAwait(false);
public async Task FlushPipeAsync()
{
await pipe.WaitForConnectionAsync();
await Source.WriteDataAsync(pipe);
} }
} }
} }

View file

@ -3,51 +3,31 @@
using System.Collections.Generic; using System.Collections.Generic;
using System.IO.Pipes; using System.IO.Pipes;
using System.Text; using System.Text;
using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
namespace FFMpegCore.FFMPEG.Argument namespace FFMpegCore.FFMPEG.Argument
{ {
public class OutputPipeArgument : Argument public class OutputPipeArgument : PipeArgument
{ {
public string PipeName { get; private set; }
public string PipePath => PipeHelpers.GetPipePath(PipeName);
public IPipeDataReader Reader { get; private set; } public IPipeDataReader Reader { get; private set; }
private NamedPipeClientStream pipe; public OutputPipeArgument(IPipeDataReader reader) : base(PipeDirection.In)
public OutputPipeArgument(IPipeDataReader reader)
{ {
Reader = reader; Reader = reader;
PipeName = PipeHelpers.GetUnqiuePipeName();
}
public void OpenPipe()
{
if(pipe != null)
throw new InvalidOperationException("Pipe already has been opened");
pipe = new NamedPipeClientStream(PipePath);
}
public void ReadData()
{
Reader.ReadData(pipe);
}
public Task ReadDataAsync()
{
return Reader.ReadDataAsync(pipe);
}
public void Close()
{
pipe?.Dispose();
pipe = null;
} }
public override string GetStringValue() public override string GetStringValue()
{ {
return $"\"{PipePath}\""; return $"\"{PipePath}\" -y";
}
public override async Task ProcessDataAsync(CancellationToken token)
{
await Pipe.WaitForConnectionAsync(token).ConfigureAwait(false);
if (!Pipe.IsConnected)
throw new TaskCanceledException();
await Reader.ReadDataAsync(Pipe).ConfigureAwait(false);
} }
} }
} }

View file

@ -0,0 +1,45 @@
using FFMpegCore.FFMPEG.Pipes;
using System;
using System.Collections.Generic;
using System.IO.Pipes;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace FFMpegCore.FFMPEG.Argument
{
public abstract class PipeArgument : Argument
{
public string PipeName { get; private set; }
public string PipePath => PipeHelpers.GetPipePath(PipeName);
protected NamedPipeServerStream Pipe { get; private set; }
private PipeDirection direction;
protected PipeArgument(PipeDirection direction)
{
PipeName = PipeHelpers.GetUnqiuePipeName();
this.direction = direction;
}
public void OpenPipe()
{
if (Pipe != null)
throw new InvalidOperationException("Pipe already has been opened");
Pipe = new NamedPipeServerStream(PipeName, direction, 1, PipeTransmissionMode.Byte, PipeOptions.Asynchronous);
}
public void ClosePipe()
{
Pipe?.Dispose();
Pipe = null;
}
public Task ProcessDataAsync()
{
return ProcessDataAsync(CancellationToken.None);
}
public abstract Task ProcessDataAsync(CancellationToken token);
}
}

View file

@ -9,5 +9,6 @@ public interface IPipeDataReader
{ {
void ReadData(System.IO.Stream stream); void ReadData(System.IO.Stream stream);
Task ReadDataAsync(System.IO.Stream stream); Task ReadDataAsync(System.IO.Stream stream);
string GetFormat();
} }
} }

View file

@ -9,8 +9,9 @@ public class StreamPipeDataReader : IPipeDataReader
{ {
public System.IO.Stream DestanationStream { get; private set; } public System.IO.Stream DestanationStream { get; private set; }
public int BlockSize { get; set; } = 4096; public int BlockSize { get; set; } = 4096;
public string Format { get; set; } = string.Empty;
public StreamPipeDataReader(System.IO. Stream destanationStream) public StreamPipeDataReader(System.IO.Stream destanationStream)
{ {
DestanationStream = destanationStream; DestanationStream = destanationStream;
} }
@ -19,7 +20,7 @@ public void ReadData(System.IO.Stream stream)
{ {
int read; int read;
var buffer = new byte[BlockSize]; var buffer = new byte[BlockSize];
while((read = stream.Read(buffer, 0, buffer.Length)) != 0) while ((read = stream.Read(buffer, 0, buffer.Length)) != 0)
DestanationStream.Write(buffer, 0, buffer.Length); DestanationStream.Write(buffer, 0, buffer.Length);
} }
@ -30,5 +31,10 @@ public async Task ReadDataAsync(System.IO.Stream stream)
while ((read = await stream.ReadAsync(buffer, 0, buffer.Length)) != 0) while ((read = await stream.ReadAsync(buffer, 0, buffer.Length)) != 0)
await DestanationStream.WriteAsync(buffer, 0, buffer.Length); await DestanationStream.WriteAsync(buffer, 0, buffer.Length);
} }
public string GetFormat()
{
return Format;
}
} }
} }