group various stream processors into middleware

This commit is contained in:
Eugene Pankov 2021-12-27 20:08:22 +01:00
parent b0c300be43
commit 076b1c7129
No known key found for this signature in database
GPG Key ID: 5896FCBBDD1CF4F4
12 changed files with 195 additions and 97 deletions

View File

@ -3,7 +3,7 @@ import SerialPort from 'serialport'
import { LogService, NotificationsService, Profile } from 'tabby-core'
import { Subject, Observable } from 'rxjs'
import { Injector, NgZone } from '@angular/core'
import { BaseSession, LoginScriptsOptions, StreamProcessingOptions, TerminalStreamProcessor } from 'tabby-terminal'
import { BaseSession, LoginScriptsOptions, SessionMiddleware, StreamProcessingOptions, TerminalStreamProcessor } from 'tabby-terminal'
import { SerialService } from './services/serial.service'
export interface SerialProfile extends Profile {
@ -32,6 +32,14 @@ export interface SerialPortInfo {
description?: string
}
class SlowFeedMiddleware extends SessionMiddleware {
feedFromTerminal (data: Buffer): void {
for (const byte of data) {
this.outputToSession.next(Buffer.from([byte]))
}
}
}
export class SerialSession extends BaseSession {
serial: SerialPort
@ -50,13 +58,11 @@ export class SerialSession extends BaseSession {
this.notifications = injector.get(NotificationsService)
this.streamProcessor = new TerminalStreamProcessor(profile.options)
this.streamProcessor.outputToSession$.subscribe(data => {
this.serial?.write(data.toString())
})
this.streamProcessor.outputToTerminal$.subscribe(data => {
this.emitOutput(data)
this.loginScriptProcessor?.feedFromSession(data)
})
this.middleware.push(this.streamProcessor)
if (this.profile.options.slowSend) {
this.middleware.unshift(new SlowFeedMiddleware())
}
this.setLoginScriptsOptions(profile.options)
}
@ -110,7 +116,7 @@ export class SerialSession extends BaseSession {
setTimeout(() => this.streamProcessor.start())
this.serial.on('readable', () => {
this.streamProcessor.feedFromSession(this.serial.read())
this.emitOutput(this.serial.read())
})
this.serial.on('end', () => {
@ -124,17 +130,10 @@ export class SerialSession extends BaseSession {
}
write (data: Buffer): void {
if (!this.profile.options.slowSend) {
this.streamProcessor.feedFromTerminal(data)
} else {
for (const byte of data) {
this.streamProcessor.feedFromTerminal(Buffer.from([byte]))
}
}
this.serial?.write(data.toString())
}
async destroy (): Promise<void> {
this.streamProcessor.close()
this.serviceMessage.complete()
await super.destroy()
}

View File

@ -3,7 +3,7 @@ import colors from 'ansi-colors'
import stripAnsi from 'strip-ansi'
import { Injector } from '@angular/core'
import { Profile, LogService } from 'tabby-core'
import { BaseSession, LoginScriptsOptions, StreamProcessingOptions, TerminalStreamProcessor } from 'tabby-terminal'
import { BaseSession, LoginScriptsOptions, SessionMiddleware, StreamProcessingOptions, TerminalStreamProcessor } from 'tabby-terminal'
import { Subject, Observable } from 'rxjs'
@ -41,6 +41,21 @@ enum TelnetOptions {
NEW_ENVIRON = 0x27,
}
class UnescapeFFMiddleware extends SessionMiddleware {
feedFromSession (data: Buffer): void {
while (data.includes(0xff)) {
const pos = data.indexOf(0xff)
this.outputToTerminal.next(data.slice(0, pos))
this.outputToTerminal.next(Buffer.from([0xff, 0xff]))
data = data.slice(pos + 1)
}
this.outputToTerminal.next(data)
}
}
export class TelnetSession extends BaseSession {
get serviceMessage$ (): Observable<string> { return this.serviceMessage }
@ -48,7 +63,6 @@ export class TelnetSession extends BaseSession {
private socket: Socket
private streamProcessor: TerminalStreamProcessor
private telnetProtocol = false
private echoEnabled = false
private lastWidth = 0
private lastHeight = 0
private requestedOptions = new Set<number>()
@ -59,33 +73,10 @@ export class TelnetSession extends BaseSession {
) {
super(injector.get(LogService).create(`telnet-${profile.options.host}-${profile.options.port}`))
this.streamProcessor = new TerminalStreamProcessor(profile.options)
this.streamProcessor.outputToSession$.subscribe(data => {
this.socket.write(this.unescapeFF(data))
})
this.streamProcessor.outputToTerminal$.subscribe(data => {
this.emitOutput(data)
})
this.middleware.push(this.streamProcessor)
this.setLoginScriptsOptions(profile.options)
}
unescapeFF (data: Buffer): Buffer {
if (!this.telnetProtocol) {
return data
}
const result: Buffer[] = []
while (data.includes(0xff)) {
const pos = data.indexOf(0xff)
result.push(data.slice(0, pos))
result.push(Buffer.from([0xff, 0xff]))
data = data.slice(pos + 1)
}
result.push(data)
return Buffer.concat(result)
}
async start (): Promise<void> {
this.socket = new Socket()
this.emitServiceMessage(`Connecting to ${this.profile.options.host}`)
@ -124,6 +115,7 @@ export class TelnetSession extends BaseSession {
onData (data: Buffer): void {
if (!this.telnetProtocol && data[0] === TelnetCommands.IAC) {
this.telnetProtocol = true
this.middleware.push(new UnescapeFFMiddleware())
this.requestOption(TelnetCommands.DO, TelnetOptions.SUPPRESS_GO_AHEAD)
this.emitTelnet(TelnetCommands.WILL, TelnetOptions.TERMINAL_TYPE)
this.emitTelnet(TelnetCommands.WILL, TelnetOptions.NEGO_WINDOW_SIZE)
@ -131,7 +123,7 @@ export class TelnetSession extends BaseSession {
if (this.telnetProtocol) {
data = this.processTelnetProtocol(data)
}
this.streamProcessor.feedFromSession(data)
this.emitOutput(data)
}
emitTelnet (command: TelnetCommands, option: TelnetOptions): void {
@ -190,7 +182,7 @@ export class TelnetSession extends BaseSession {
this.emitTelnet(TelnetCommands.WILL, option)
this.emitSize()
} else if (option === TelnetOptions.ECHO) {
this.echoEnabled = true
this.streamProcessor.forceEcho = true
this.emitTelnet(TelnetCommands.WILL, option)
} else if (option === TelnetOptions.TERMINAL_TYPE) {
this.emitTelnet(TelnetCommands.WILL, option)
@ -201,7 +193,7 @@ export class TelnetSession extends BaseSession {
}
if (command === TelnetCommands.DONT) {
if (option === TelnetOptions.ECHO) {
this.echoEnabled = false
this.streamProcessor.forceEcho = false
this.emitTelnet(TelnetCommands.WONT, option)
} else {
this.logger.debug('(!) Unhandled option')
@ -249,10 +241,7 @@ export class TelnetSession extends BaseSession {
}
write (data: Buffer): void {
if (this.echoEnabled) {
this.emitOutput(data)
}
this.streamProcessor.feedFromTerminal(data)
this.socket.write(data)
}
kill (_signal?: string): void {

View File

@ -389,7 +389,7 @@ export class BaseTerminalTabComponent extends BaseTabComponent implements OnInit
if (!(data instanceof Buffer)) {
data = Buffer.from(data, 'utf-8')
}
this.session?.write(data)
this.session?.feedFromTerminal(data)
if (this.config.store.terminal.scrollOnInput) {
this.frontend?.scrollToBottom()
}

View File

@ -0,0 +1,101 @@
import { Subject, Observable } from 'rxjs'
import { SubscriptionContainer } from 'tabby-core'
export class SessionMiddleware {
get outputToSession$ (): Observable<Buffer> { return this.outputToSession }
get outputToTerminal$ (): Observable<Buffer> { return this.outputToTerminal }
protected outputToSession = new Subject<Buffer>()
protected outputToTerminal = new Subject<Buffer>()
feedFromSession (data: Buffer): void {
this.outputToTerminal.next(data)
}
feedFromTerminal (data: Buffer): void {
this.outputToSession.next(data)
}
close (): void {
this.outputToSession.complete()
this.outputToTerminal.complete()
}
}
export class SesssionMiddlewareStack extends SessionMiddleware {
private stack: SessionMiddleware[] = []
private subs = new SubscriptionContainer()
constructor () {
super()
this.push(new SessionMiddleware())
}
push (middleware: SessionMiddleware): void {
this.stack.push(middleware)
this.relink()
}
unshift (middleware: SessionMiddleware): void {
this.stack.unshift(middleware)
this.relink()
}
remove (middleware: SessionMiddleware): void {
this.stack = this.stack.filter(m => m !== middleware)
this.relink()
}
replace (middleware: SessionMiddleware, newMiddleware: SessionMiddleware): void {
const index = this.stack.indexOf(middleware)
if (index >= 0) {
this.stack[index].close()
this.stack[index] = newMiddleware
} else {
this.stack.push(newMiddleware)
}
this.relink()
}
feedFromSession (data: Buffer): void {
this.stack[0].feedFromSession(data)
}
feedFromTerminal (data: Buffer): void {
this.stack[this.stack.length - 1].feedFromTerminal(data)
}
close (): void {
for (const m of this.stack) {
m.close()
}
this.subs.cancelAll()
super.close()
}
private relink () {
this.subs.cancelAll()
for (let i = 0; i < this.stack.length - 1; i++) {
this.subs.subscribe(
this.stack[i].outputToTerminal$,
x => this.stack[i + 1].feedFromSession(x)
)
}
this.subs.subscribe(
this.stack[this.stack.length - 1].outputToTerminal$,
x => this.outputToTerminal.next(x),
)
for (let i = this.stack.length - 2; i >= 0; i--) {
this.subs.subscribe(
this.stack[i + 1].outputToSession$,
x => this.stack[i].feedFromTerminal(x)
)
}
this.subs.subscribe(
this.stack[0].outputToSession$,
x => this.outputToSession.next(x),
)
}
}

View File

@ -2,7 +2,7 @@
import { Component, Input } from '@angular/core'
import { PlatformService } from 'tabby-core'
import { LoginScript, LoginScriptsOptions } from '../api/loginScriptProcessing'
import { LoginScript, LoginScriptsOptions } from '../middleware/loginScriptProcessing'
/** @hidden */
@Component({

View File

@ -1,6 +1,6 @@
/* eslint-disable @typescript-eslint/explicit-module-boundary-types */
import { Component, Input } from '@angular/core'
import { StreamProcessingOptions } from '../api/streamProcessing'
import { StreamProcessingOptions } from '../middleware/streamProcessing'
/** @hidden */
@Component({

View File

@ -32,7 +32,7 @@ export class ZModemDecorator extends TerminalDecorator {
terminal.write(data)
}
},
sender: data => terminal.session!.write(Buffer.from(data)),
sender: data => terminal.session!.feedFromTerminal(Buffer.from(data)),
on_detect: async detection => {
try {
terminal.enablePassthrough = false

View File

@ -87,8 +87,9 @@ export { TerminalFrontendService, TerminalDecorator, TerminalContextMenuItemProv
export { Frontend, XTermFrontend, XTermWebGLFrontend }
export { BaseTerminalTabComponent } from './api/baseTerminalTab.component'
export * from './api/interfaces'
export * from './api/streamProcessing'
export * from './api/loginScriptProcessing'
export * from './api/osc1337Processing'
export * from './middleware/streamProcessing'
export * from './middleware/loginScriptProcessing'
export * from './middleware/oscProcessing'
export * from './api/middleware'
export * from './session'
export { LoginScriptsSettingsComponent, StreamProcessingSettingsComponent }

View File

@ -1,6 +1,6 @@
import deepClone from 'clone-deep'
import { Subject, Observable } from 'rxjs'
import { Logger } from 'tabby-core'
import { SessionMiddleware } from '../api/middleware'
export interface LoginScript {
expect: string
@ -13,10 +13,7 @@ export interface LoginScriptsOptions {
scripts?: LoginScript[]
}
export class LoginScriptProcessor {
get outputToSession$ (): Observable<Buffer> { return this.outputToSession }
private outputToSession = new Subject<Buffer>()
export class LoginScriptProcessor extends SessionMiddleware {
private remainingScripts: LoginScript[] = []
private escapeSeqMap = {
@ -34,6 +31,7 @@ export class LoginScriptProcessor {
private logger: Logger,
options: LoginScriptsOptions
) {
super()
this.remainingScripts = deepClone(options.scripts ?? [])
for (const script of this.remainingScripts) {
if (!script.isRegex) {
@ -43,10 +41,9 @@ export class LoginScriptProcessor {
}
}
feedFromSession (data: Buffer): boolean {
feedFromSession (data: Buffer): void {
const dataString = data.toString()
let found = false
for (const script of this.remainingScripts) {
if (!script.expect) {
continue
@ -60,14 +57,12 @@ export class LoginScriptProcessor {
}
if (match) {
found = true
this.logger.info('Executing script:', script)
this.outputToSession.next(Buffer.from(script.send + '\n'))
this.remainingScripts = this.remainingScripts.filter(x => x !== script)
} else {
if (script.optional) {
this.logger.debug('Skip optional script: ' + script.expect)
found = true
this.remainingScripts = this.remainingScripts.filter(x => x !== script)
} else {
break
@ -75,7 +70,7 @@ export class LoginScriptProcessor {
}
}
return found
super.feedFromSession(data)
}
close (): void {

View File

@ -1,17 +1,18 @@
import * as os from 'os'
import { Subject, Observable } from 'rxjs'
import { SessionMiddleware } from '../api/middleware'
const OSCPrefix = Buffer.from('\x1b]')
const OSCSuffix = Buffer.from('\x07')
export class OSCProcessor {
export class OSCProcessor extends SessionMiddleware {
get cwdReported$ (): Observable<string> { return this.cwdReported }
get copyRequested$ (): Observable<string> { return this.copyRequested }
private cwdReported = new Subject<string>()
private copyRequested = new Subject<string>()
process (data: Buffer): Buffer {
feedFromSession (data: Buffer): void {
let startIndex = 0
while (data.includes(OSCPrefix, startIndex) && data.includes(OSCSuffix, startIndex)) {
const params = data.subarray(data.indexOf(OSCPrefix, startIndex) + OSCPrefix.length)
@ -42,10 +43,12 @@ export class OSCProcessor {
continue
}
}
return data
super.feedFromSession(data)
}
close (): void {
this.cwdReported.complete()
this.copyRequested.complete()
super.close()
}
}

View File

@ -2,9 +2,10 @@ import hexdump from 'hexer'
import bufferReplace from 'buffer-replace'
import colors from 'ansi-colors'
import binstring from 'binstring'
import { Subject, Observable, interval, debounce } from 'rxjs'
import { interval, debounce } from 'rxjs'
import { PassThrough, Readable, Writable } from 'stream'
import { ReadLine, createInterface as createReadline, clearLine } from 'readline'
import { SessionMiddleware } from '../api/middleware'
export type InputMode = null | 'local-echo' | 'readline' | 'readline-hex'
export type OutputMode = null | 'hex'
@ -17,13 +18,8 @@ export interface StreamProcessingOptions {
outputNewlines?: NewlineMode
}
export class TerminalStreamProcessor {
get outputToSession$ (): Observable<Buffer> { return this.outputToSession }
get outputToTerminal$ (): Observable<Buffer> { return this.outputToTerminal }
protected outputToSession = new Subject<Buffer>()
protected outputToTerminal = new Subject<Buffer>()
export class TerminalStreamProcessor extends SessionMiddleware {
forceEcho = false
private inputReadline: ReadLine
private inputPromptVisible = false
private inputReadlineInStream: Readable & Writable
@ -31,6 +27,7 @@ export class TerminalStreamProcessor {
private started = false
constructor (private options: StreamProcessingOptions) {
super()
this.inputReadlineInStream = new PassThrough()
this.inputReadlineOutStream = new PassThrough()
this.inputReadlineOutStream.on('data', data => {
@ -85,7 +82,7 @@ export class TerminalStreamProcessor {
}
feedFromTerminal (data: Buffer): void {
if (this.options.inputMode === 'local-echo') {
if (this.options.inputMode === 'local-echo' || this.forceEcho) {
this.outputToTerminal.next(this.replaceNewlines(data, 'crlf'))
}
if (this.options.inputMode?.startsWith('readline')) {
@ -103,8 +100,7 @@ export class TerminalStreamProcessor {
close (): void {
this.inputReadline.close()
this.outputToSession.complete()
this.outputToTerminal.complete()
super.close()
}
private onTerminalInput (data: Buffer) {

View File

@ -1,7 +1,8 @@
import { Observable, Subject } from 'rxjs'
import { Logger } from 'tabby-core'
import { LoginScriptProcessor, LoginScriptsOptions } from './api/loginScriptProcessing'
import { OSCProcessor } from './api/osc1337Processing'
import { LoginScriptProcessor, LoginScriptsOptions } from './middleware/loginScriptProcessing'
import { OSCProcessor } from './middleware/oscProcessing'
import { SesssionMiddlewareStack } from './api/middleware'
/**
* A session object for a [[BaseTerminalTabComponent]]
@ -11,6 +12,7 @@ export abstract class BaseSession {
open: boolean
truePID?: number
oscProcessor = new OSCProcessor()
protected readonly middleware = new SesssionMiddlewareStack()
protected output = new Subject<string>()
protected binaryOutput = new Subject<Buffer>()
protected closed = new Subject<void>()
@ -26,20 +28,29 @@ export abstract class BaseSession {
get destroyed$ (): Observable<void> { return this.destroyed }
constructor (protected logger: Logger) {
this.middleware.push(this.oscProcessor)
this.oscProcessor.cwdReported$.subscribe(cwd => {
this.reportedCWD = cwd
})
this.middleware.outputToTerminal$.subscribe(data => {
if (!this.initialDataBufferReleased) {
this.initialDataBuffer = Buffer.concat([this.initialDataBuffer, data])
} else {
this.output.next(data.toString())
this.binaryOutput.next(data)
}
})
this.middleware.outputToSession$.subscribe(data => this.write(data))
}
emitOutput (data: Buffer): void {
data = this.oscProcessor.process(data)
if (!this.initialDataBufferReleased) {
this.initialDataBuffer = Buffer.concat([this.initialDataBuffer, data])
} else {
this.output.next(data.toString())
this.binaryOutput.next(data)
this.loginScriptProcessor?.feedFromSession(data)
}
feedFromTerminal (data: Buffer): void {
this.middleware.feedFromTerminal(data)
}
protected emitOutput (data: Buffer): void {
this.middleware.feedFromSession(data)
}
releaseInitialDataBuffer (): void {
@ -50,21 +61,24 @@ export abstract class BaseSession {
}
setLoginScriptsOptions (options: LoginScriptsOptions): void {
this.loginScriptProcessor?.close()
this.loginScriptProcessor = new LoginScriptProcessor(this.logger, options)
this.loginScriptProcessor.outputToSession$.subscribe(data => this.write(data))
const newProcessor = new LoginScriptProcessor(this.logger, options)
if (this.loginScriptProcessor) {
this.middleware.replace(this.loginScriptProcessor, newProcessor)
} else {
this.middleware.push(newProcessor)
}
this.loginScriptProcessor = newProcessor
}
async destroy (): Promise<void> {
if (this.open) {
this.logger.info('Destroying')
this.open = false
this.loginScriptProcessor?.close()
this.closed.next()
this.destroyed.next()
await this.gracefullyKillProcess()
}
this.oscProcessor.close()
this.middleware.close()
this.closed.complete()
this.destroyed.complete()
this.output.complete()