Attempt at unix pipe support

This commit is contained in:
Malte Rosenbjerg 2020-05-09 20:34:27 +02:00
parent 45f2b54384
commit 6c84396898
8 changed files with 123 additions and 38 deletions

View file

@ -52,7 +52,6 @@ public void Probe_Success_FromStream_Async()
using (var stream = File.OpenRead(VideoLibrary.LocalVideo.FullName)) using (var stream = File.OpenRead(VideoLibrary.LocalVideo.FullName))
{ {
var info = output.ParseVideoInfoAsync(stream).WaitForResult(); var info = output.ParseVideoInfoAsync(stream).WaitForResult();
Assert.AreEqual(13, info.Duration.Seconds); Assert.AreEqual(13, info.Duration.Seconds);
} }
} }

View file

@ -30,10 +30,8 @@ public override string GetStringValue()
public override async Task ProcessDataAsync(CancellationToken token) public override async Task ProcessDataAsync(CancellationToken token)
{ {
await Pipe.WaitForConnectionAsync(token).ConfigureAwait(false); await Pipe.During(token).ConfigureAwait(false);
if (!Pipe.IsConnected) await Writer.WriteDataAsync(Pipe.GetStream()).ConfigureAwait(false);
throw new TaskCanceledException();
await Writer.WriteDataAsync(Pipe).ConfigureAwait(false);
} }
} }
} }

View file

@ -24,10 +24,8 @@ public override string GetStringValue()
public override async Task ProcessDataAsync(CancellationToken token) public override async Task ProcessDataAsync(CancellationToken token)
{ {
await Pipe.WaitForConnectionAsync(token).ConfigureAwait(false); await Pipe.During(token).ConfigureAwait(false);
if (!Pipe.IsConnected) await Reader.ReadDataAsync(Pipe.GetStream()).ConfigureAwait(false);
throw new TaskCanceledException();
await Reader.ReadDataAsync(Pipe).ConfigureAwait(false);
} }
} }
} }

View file

@ -2,38 +2,40 @@
using System; using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.IO.Pipes; using System.IO.Pipes;
using System.Runtime.InteropServices;
using System.Text; using System.Text;
using System.Threading; using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
using Mono.Unix;
namespace FFMpegCore.FFMPEG.Argument namespace FFMpegCore.FFMPEG.Argument
{ {
public abstract class PipeArgument : Argument public abstract class PipeArgument : Argument
{ {
public string PipeName { get; private set; } public string PipePath => Pipe.PipePath;
public string PipePath => PipeHelpers.GetPipePath(PipeName);
protected NamedPipeServerStream Pipe { get; private set; } protected INamedPipe Pipe { get; private set; }
private PipeDirection direction; private PipeDirection direction;
protected PipeArgument(PipeDirection direction) protected PipeArgument(PipeDirection direction)
{ {
PipeName = PipeHelpers.GetUnqiuePipeName(); var pipeName = "FFMpegCore_Pipe_" + Guid.NewGuid();
Pipe = RuntimeInformation.IsOSPlatform(OSPlatform.Windows) switch
{
true => new WindowsNamedPipe(pipeName),
false => new UnixNamedPipe(pipeName)
};
this.direction = direction; this.direction = direction;
} }
public void OpenPipe() public void OpenPipe()
{ {
if (Pipe != null) Pipe.Open(direction);
throw new InvalidOperationException("Pipe already has been opened");
Pipe = new NamedPipeServerStream(PipeName, direction, 1, PipeTransmissionMode.Byte, PipeOptions.Asynchronous);
} }
public void ClosePipe() public void ClosePipe()
{ {
Pipe?.Dispose(); Pipe.Close();
Pipe = null;
} }
public Task ProcessDataAsync() public Task ProcessDataAsync()
{ {

View file

@ -0,0 +1,105 @@
using System;
using System.IO;
using System.IO.Pipes;
using System.Threading;
using System.Threading.Tasks;
using Instances;
namespace FFMpegCore.FFMPEG.Argument
{
public interface INamedPipe
{
public void Open(PipeDirection direction);
public Task During(CancellationToken cancellationToken);
public void Close();
System.IO.Stream GetStream();
string PipePath { get; }
}
public class WindowsNamedPipe : INamedPipe
{
private readonly string _pipeName;
public WindowsNamedPipe(string pipeName)
{
_pipeName = pipeName;
}
public void Open(PipeDirection direction)
{
if (Pipe != null)
throw new InvalidOperationException("Pipe already has been opened");
Pipe = new NamedPipeServerStream(_pipeName, direction, 1, PipeTransmissionMode.Byte, PipeOptions.Asynchronous);
}
public async Task During(CancellationToken cancellationToken)
{
await Pipe.WaitForConnectionAsync(cancellationToken).ConfigureAwait(false);
if (!Pipe.IsConnected)
throw new TaskCanceledException();
}
public System.IO.Stream GetStream()
{
return Pipe;
}
public NamedPipeServerStream Pipe { get; set; }
public void Close()
{
Pipe?.Dispose();
Pipe = null;
}
public string PipePath => $@"\\.\pipe\{_pipeName}";
}
public class UnixNamedPipe : INamedPipe
{
private readonly string _pipeName;
private PipeDirection _direction;
public UnixNamedPipe(string pipeName)
{
_pipeName = pipeName;
}
public void Open(PipeDirection direction)
{
if (direction == PipeDirection.InOut)
throw new NotImplementedException();
_direction = direction;
if (File.Exists(PipePath))
throw new IOException($"Pipe name is already in use ({PipePath})");
var (exitCode, _) = Instance.Finish("mkfifo", PipePath);
if (exitCode != 0)
throw new IOException($"Could not create FIFO file. (mkfifo failed with argument '{PipePath}')");
}
public Task During(CancellationToken cancellationToken)
{
return Task.CompletedTask;
}
public void Close()
{
if (!File.Exists(PipePath))
throw new IOException($"Could not find pipe to close");
File.Delete(PipePath);
}
public System.IO.Stream GetStream()
{
return _direction switch
{
PipeDirection.In => File.OpenRead(PipePath),
PipeDirection.Out => File.OpenWrite(PipePath),
_ => throw new NotImplementedException()
};
}
public string PipePath => $"/tmp/CoreFxPipe_FIFO_{_pipeName}";
}
}

View file

@ -121,7 +121,7 @@ public async Task<VideoInfo> ParseVideoInfoAsync(System.IO.Stream stream)
var task = instance.FinishedRunning(); var task = instance.FinishedRunning();
try try
{ {
await pipeArgument.ProcessDataAsync(); await pipeArgument.ProcessDataAsync().ConfigureAwait(false);
pipeArgument.ClosePipe(); pipeArgument.ClosePipe();
} }
catch (IOException) catch (IOException)

View file

@ -1,18 +0,0 @@
using System;
using System.Runtime.InteropServices;
namespace FFMpegCore.FFMPEG.Pipes
{
static class PipeHelpers
{
public static string GetUnqiuePipeName() => "FFMpegCore_Pipe_" + Guid.NewGuid();
public static string GetPipePath(string pipeName)
{
if (RuntimeInformation.IsOSPlatform(OSPlatform.Windows))
return $@"\\.\pipe\{pipeName}";
else
return $"/tmp/CoreFxPipe_{pipeName}"; // dotnet uses unix sockets on unix, for more see https://github.com/dotnet/runtime/issues/24390
}
}
}

View file

@ -32,6 +32,7 @@ Thanks to max619 and WeihanLi
<ItemGroup> <ItemGroup>
<PackageReference Include="Instances" Version="1.5.0" /> <PackageReference Include="Instances" Version="1.5.0" />
<PackageReference Include="Mono.Posix.NETStandard" Version="1.0.0" />
<PackageReference Include="Newtonsoft.Json" Version="12.0.2" /> <PackageReference Include="Newtonsoft.Json" Version="12.0.2" />
<PackageReference Include="System.Drawing.Common" Version="4.5.1" /> <PackageReference Include="System.Drawing.Common" Version="4.5.1" />
</ItemGroup> </ItemGroup>